当前位置:网站首页>Etcd database source code analysis - put process of server
Etcd database source code analysis - put process of server
2022-06-29 19:49:00 【Tertium ferrugosum】
stay ETCD Database source code analysis ——etcd gRPC service API Registration services section , There are the following grpc Service registration code , Including the server side PUT The process belongs to KVServer.
RegisterKVServer Function defined in /api/etcdserverpb/rpc.pb.go In file , here pb.RegisterKVServer(grpcServer, NewQutaKVServer(s)) Medium s Refers to ETCD The most important in the database etcdserver Structure .
func RegisterKVServer(s *grpc.Server, srv KVServer) {
s.RegisterService(&_KV_serviceDesc, srv)
}
stay /api/etcdserverpb/rpc.pb.go Inside , You can see the definition above ServiceName and MethodName, Can find Put Method _KV_Put_Handler.
from _KV_Put_Handler The most important statement that a function can see is srv.(KVServer).Put(ctx, in), It's actually called quotaKVServer.(KVServer).Put(ctx, in).
func _KV_Put_Handler(srv interface{
}, ctx context.Context, dec func(interface{
}) error, interceptor grpc.UnaryServerInterceptor) (interface{
}, error) {
in := new(PutRequest)
if err := dec(in); err != nil {
return nil, err }
if interceptor == nil {
return srv.(KVServer).Put(ctx, in) }
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/etcdserverpb.KV/Put",
}
handler := func(ctx context.Context, req interface{
}) (interface{
}, error) {
return srv.(KVServer).Put(ctx, req.(*PutRequest)) }
return interceptor(ctx, in, info, handler)
}
So here from NewQuotaKVServer See ,quotaKVServer and pb.KVServer Interface is a composite relationship ,NewQuotaKVServer Call in function NewKVServer Function initialization inherits pb.KVServer Interface kvServer Structure .
// server/etcdserver/api/v3rpc/quota.go
type quotaKVServer struct {
pb.KVServer
qa quotaAlarmer
}
func NewQuotaKVServer(s *etcdserver.EtcdServer) pb.KVServer {
return "aKVServer{
NewKVServer(s),
quotaAlarmer{
newBackendQuota(s, "kv"), s, s.MemberId()},
}
}
etcdserver.RaftKV Interface inherited KVServer Interface . and pb.RegisterKVServer(grpcServer, NewQutaKVServer(s)) Medium s Refers to ETCD The most important in the database etcdserver Structure , So here etcdserver.RaftKV It points to etcdserver Structure .
type kvServer struct {
hdr header // The header information used to populate the response message
kv etcdserver.RaftKV
// maxTxnOps is the max operations per txn.
// e.g suppose maxTxnOps = 128.
// Txn.Success can have at most 128 operations,
// and Txn.Failure can have at most 128 operations.
maxTxnOps uint
}
func NewKVServer(s *etcdserver.EtcdServer) pb.KVServer {
return &kvServer{
hdr: newHeader(s), kv: s, maxTxnOps: s.Cfg.MaxTxnOps}
}
func (s *kvServer) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) {
if err := checkPutRequest(r); err != nil {
return nil, err }
resp, err := s.kv.Put(ctx, r)
if err != nil {
return nil, togRPCError(err) }
s.hdr.fill(resp.Header)
return resp, nil
}
kvServer Structure Put Function processing flow is as follows : First, all aspects of the request message will be checked , After checking, all the requests will be sent to the encapsulated RaftKV Interface for processing , After receiving the response message after the processing is completed , Will pass header.fill() Method to fill in the header information of the response , Finally, the complete response message is returned to the client . therefore , Track down srv.(KVServer).Put(ctx, in) In fact, it's called (s *EtcdServer) Put() function .
// server/etcdserver/v3_server.go
func (s *EtcdServer) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) {
ctx = context.WithValue(ctx, traceutil.StartTimeKey, time.Now())
resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{
Put: r})
if err != nil {
return nil, err }
return resp.(*pb.PutResponse), nil
}
func (s *EtcdServer) raftRequest(ctx context.Context, r pb.InternalRaftRequest) (proto.Message, error) {
return s.raftRequestOnce(ctx, r)
}
func (s *EtcdServer) raftRequestOnce(ctx context.Context, r pb.InternalRaftRequest) (proto.Message, error) {
result, err := s.processInternalRaftRequestOnce(ctx, r)
if err != nil {
return nil, err }
if result.Err != nil {
return nil, result.Err }
if startTime, ok := ctx.Value(traceutil.StartTimeKey).(time.Time); ok && result.Trace != nil {
applyStart := result.Trace.GetStartTime()
// The trace object is created in toApply. Here reset the start time to trace
// the raft request time by the difference between the request start time
// and toApply start time
result.Trace.SetStartTime(startTime)
result.Trace.InsertStep(0, applyStart, "process raft request")
result.Trace.LogIfLong(traceThreshold)
}
return result.Resp, nil
}
From the above process, we can see that the final call is (s *EtcdServer) processInternalRaftRequestOnce(…) function , There is a key call in this function s.r.Propose(cctx, data).s yes EtcdServer, r Is its member variable raftNode, That's getting into raft The rhythm of the agreement .
func (s *EtcdServer) processInternalRaftRequestOnce(ctx context.Context, r pb.InternalRaftRequest) (*apply2.Result, error) {
ai := s.getAppliedIndex()
ci := s.getCommittedIndex()
if ci > ai+maxGapBetweenApplyAndCommitIndex {
return nil, errors.ErrTooManyRequests }
r.Header = &pb.RequestHeader{
ID: s.reqIDGen.Next(), }
// check authinfo if it is not InternalAuthenticateRequest
if r.Authenticate == nil {
authInfo, err := s.AuthInfoFromCtx(ctx)
if err != nil {
return nil, err }
if authInfo != nil {
r.Header.Username = authInfo.Username
r.Header.AuthRevision = authInfo.Revision
}
}
data, err := r.Marshal()
if err != nil {
return nil, err }
if len(data) > int(s.Cfg.MaxRequestBytes) {
return nil, errors.ErrRequestTooLarge }
id := r.ID
if id == 0 {
id = r.Header.ID }
ch := s.w.Register(id)
cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout())
defer cancel()
start := time.Now()
err = s.r.Propose(cctx, data)
if err != nil {
proposalsFailed.Inc()
s.w.Trigger(id, nil) // GC wait
return nil, err
}
proposalsPending.Inc()
defer proposalsPending.Dec()
select {
case x := <-ch:
return x.(*apply2.Result), nil
case <-cctx.Done():
proposalsFailed.Inc()
s.w.Trigger(id, nil) // GC wait
return nil, s.parseProposeCtxErr(cctx.Err(), start)
case <-s.done:
return nil, errors.ErrStopped
}
}
Welcome to pay attention to the wechat official account , The latest content will be released on wechat official account in priority .
边栏推荐
- JVM(2) 垃圾回收
- MSYQL, redis, mongodb visual monitoring tool grafana
- How to set a pod to run on a specified node
- WPS and Excelle
- 【U盘检测】为了转移压箱底的资料,买了个2T U盘检测仅仅只有47G~
- JVM (4) bytecode technology + runtime optimization
- Canonical engineers are trying to solve the performance problem of Firefox snap
- Flume theory
- 【剑指Offer】51. 数组中的逆序对
- Arm comprehensive computing solution redefines visual experience and powerfully enables mobile games
猜你喜欢

【精品】pinia详解

【网络方向实训】-企业园区网络设计-【Had Done】

Classic illustration of K-line diagram (Collection Edition)

Technical methodology of new AI engine under the data infrastructure upgrade window

苹果iPhone手机升级系统内存空间变小不够如何解决?

3 - 3 découverte de l'hôte - découverte à quatre niveaux

童年经典蓝精灵之百变蓝爸爸数字藏品中奖名单公布

JVM(4) 字节码技术+运行期优化

Where is the win11 installation permission set? Win11 installation permission setting method

Deficiencies and optimization schemes in Dao
随机推荐
Regular expression series of mobile phone numbers
Flume配置4——自定义Source+Sink
lock4j--分布式锁中间件--自定义获取锁失败的逻辑
Performance improvement at the cost of other components is not good
IP error problem of PHP laravel using AWS load balancer
并查集(Union-Find)
【网络方向实训】-企业园区网络设计-【Had Done】
k线图经典图解(收藏版)
CorelDRAW最新24.1.0.360版本更新介绍讲解
4-2 port banner information acquisition
Sword finger offer 41 Median in data stream
1404萬!四川省人社廳關系型數據庫及中間件軟件系統昇級采購招標!
The era of data security solutions
There is no small green triangle on the method in idea
NLP - giza++ implements word alignment
jfinal中如何使用过滤器监控Druid监听SQL执行?
Tiger painter mengxiangshun's digital collection is on sale in limited quantities and comes with Maotai in the year of the tiger
Mba-day19 if P then q contradictory relation P and not Q
JVM (4) Bytecode Technology + Runtime Optimization
Exploration and practice of NLP problem modeling scheme