当前位置:网站首页>Explain in detail the implementation of grpc client long connection mechanism
Explain in detail the implementation of grpc client long connection mechanism
2022-07-26 18:20:00 【Zhang San admin】
The author of this article :
Xiong miaojun , Link to the original text :https://pandaychen.github.io/2020/09/01/GRPC-CLIENT-CONN-LASTING/
Reprint
Go Chinese language network official account Golang Enthusiast community , There are thousands of excellent articles on selected websites for you to learn , Content covered Golang Basic series of tutorials 、 Practical tutorials and other excellent open source project practices , At the same time, I will share my career experience . Get weekly Golang A week's information and other noteworthy content
0x00 Preface
HTTP2 Is a full duplex Streaming Protocol , The server can also take the initiative ping client , And the server will also have some check connection availability and control clients ping Configuration of packet frequency .gRPC Is to adopt HTTP2 As its basic communication mode , So the default gRPC Clients are all long connections . There is such a scene , You need to maintain a long and lasting connection between the client and the server , That is, no matter the server 、 The client is disconnected or restarted abnormally , Long connections should have retry and keep alive ( Of course, the premise is that both parties restart successfully ) The needs of . stay gRPC in , For long connections that have been established , After the server restarts abnormally , The client will generally receive the following error :
rpc error: code = Unavailable desc = transport is closing
Most of gRPC Client side encapsulation does not handle this kind of case, See Warden About Server After the end service is restarted Client Retry problem after disconnection [1], For this kind of mistake , Two treatment methods are recommended :
retry : When the client call fails , Choose to retreat exponentially (Exponential Backoff ) To retry gracefully increase keepalive Survival strategy Increase reconnection (auto reconnect) Strategy
This article will analyze how to achieve such a client to keep alive (keepalive) Logic . Mention the survival mechanism , Let's take a look first gRPC Of keepalive Mechanism [2]. 0x01 HTTP2 Of GOAWAY frame HTTP2 Use GOAWAY Frame signal to control the connection closing ,GOAWAY Used to start connection closing or send serious error status signal .GOAWAY The semantics are to allow the endpoint to normally stop accepting new streams , At the same time, the processing of the previously established stream is still completed , When client After receiving this packet, you will actively close the connection . The next time you need to send data , The connection will be re established .GOAWAY It's the realization of grpc.gracefulStop Important guarantee of mechanism .
gRPC client keepalive
gRPC Provided by the client keepalive The configuration is as follows :
var kacp = keepalive.ClientParameters{
Time: 10 * time.Second, // send pings every 10 seconds if there is no activity
Timeout: time.Second, // wait 1 second for ping ack before considering the connection dead
PermitWithoutStream: true, // send pings even without active streams
}
//Dial In the middle of keepalive To configure
conn, err := grpc.Dial(*addr, grpc.WithInsecure(), grpc.WithKeepaliveParams(kacp))
keepalive.ClientParameters The meaning of the parameter is as follows :
Time: without activity, Is every 10s Send a ping package Timeout: If ping ack 1s If it does not return within, it is considered that the connection has been disconnected PermitWithoutStream: without active Of stream, Is it allowed to send ping Associated with the , In the project sshclient [3] and mysql There are similar implementations in the client , That is, open the collaboration process separately to realize keepalive: Like the following code ( With ssh For example ):
go func() {
t := time.NewTicker(2 * time.Second)
defer t.Stop()
for range t.C {
_, _, err := client.Conn.SendRequest("[email protected]", true, nil)
if err != nil {
return
}
}
}()
gPRC The implementation of the
stay grpc-go Of newHTTP2Client[4] In the method , There is the following logic : That is, create a new HTTP2Client One will start when goroutine To deal with it keepalive
// newHTTP2Client constructs a connected ClientTransport to addr based on HTTP2
// and starts to receive messages on it. Non-nil error returns if construction
// fails.
func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts ConnectOptions, onPrefaceReceipt func(), onGoAway func(GoAwayReason), onClose func()) (_ *http2Client, err error) {
...
if t.keepaliveEnabled {
t.kpDormancyCond = sync.NewCond(&t.mu)
go t.keepalive()
}
...
}
Next , look down keepalive Method [5] The implementation of the :
func (t *http2Client) keepalive() {
p := &ping{data: [8]byte{}} //ping The content of
timer := time.NewTimer(t.kp.Time) // Start a timer , Trigger time is configured Time value
//for loop
for {
select {
// Timer triggered
case <-timer.C:
if atomic.CompareAndSwapUint32(&t.activity, 1, 0) {
timer.Reset(t.kp.Time)
continue
}
// Check if keepalive should go dormant.
t.mu.Lock()
if len(t.activeStreams) < 1 && !t.kp.PermitWithoutStream {
// Make awakenKeepalive writable.
<-t.awakenKeepalive
t.mu.Unlock()
select {
case <-t.awakenKeepalive:
// If the control gets here a ping has been sent
// need to reset the timer with keepalive.Timeout.
case <-t.ctx.Done():
return
}
} else {
t.mu.Unlock()
if channelz.IsOn() {
atomic.AddInt64(&t.czData.kpCount, 1)
}
// Send ping.
t.controlBuf.put(p)
}
// By the time control gets here a ping has been sent one way or the other.
timer.Reset(t.kp.Timeout)
select {
case <-timer.C:
if atomic.CompareAndSwapUint32(&t.activity, 1, 0) {
timer.Reset(t.kp.Time)
continue
}
t.Close()
return
case <-t.ctx.Done():
if !timer.Stop() {
<-timer.C
}
return
}
// Upper level notice context end
case <-t.ctx.Done():
if !timer.Stop() {
// return false, Express timer Not destroyed
<-timer.C
}
return
}
}
}
From client keepalive Sort out the execution logic in the implementation :
fill ping Package content , by [8]byte{}, Create timer , The trigger time is Time Loop processing ,select The two branches of , One is the logic executed after the timer is triggered , The other branch is t.ctx.Done(), namely keepalive The upper application of called cancel end context subtree The core logic is in the process of timer triggering
gRPC Server side keepalive
gRPC There are two main logic in the server of : Receive and corresponding client ping package Start alone goroutine Detect whether the client is alive gRPC Provided by the server keepalive To configure , In two parts keepalive.EnforcementPolicy and keepalive.ServerParameters, as follows :
var kaep = keepalive.EnforcementPolicy{
MinTime: 5 * time.Second, // If a client pings more than once every 5 seconds, terminate the connection
PermitWithoutStream: true, // Allow pings even when there are no active streams
}
var kasp = keepalive.ServerParameters{
MaxConnectionIdle: 15 * time.Second, // If a client is idle for 15 seconds, send a GOAWAY
MaxConnectionAge: 30 * time.Second, // If any connection is alive for more than 30 seconds, send a GOAWAY
MaxConnectionAgeGrace: 5 * time.Second, // Allow 5 seconds for pending RPCs to complete before forcibly closing connections
Time: 5 * time.Second, // Ping the client if it is idle for 5 seconds to ensure the connection is still active
Timeout: 1 * time.Second, // Wait 1 second for the ping ack before assuming the connection is dead
}
func main(){
...
s := grpc.NewServer(grpc.KeepaliveEnforcementPolicy(kaep), grpc.KeepaliveParams(kasp))
...
}
keepalive.EnforcementPolicy:
MinTime: If the client twice ping Less than 5s, Close the connection PermitWithoutStream: Even without active stream, It is also allowed to ping keepalive.ServerParameters: MaxConnectionIdle: If one client More leisure than 15s, Send a GOAWAY, To prevent sending a large number of at the same time GOAWAY, Will be in 15s The time interval fluctuates up and down 15*10%, namely 15+1.5 perhaps 15-1.5 MaxConnectionAge: If any connection survives longer than 30s, Send a GOAWAY MaxConnectionAgeGrace: Between forcibly closing the connection , Allow 5s Time to finish pending Of rpc request Time: If one client More leisure than 5s, Send a ping request Timeout: If ping request 1s I didn't get a reply in the week , The connection is considered disconnected
gRPC The implementation of the
The server handles the client's ping Bag response The logic of handlePing Method [6] in .handlePing Method will determine whether it violates two policy, If it is violated, it will pingStrikes++, When the number of violations is greater than maxPingStrikes(2) when , Print an error log and send a goAway package , Disconnect this connection , The specific implementation is as follows :
func (t *http2Server) handlePing(f *http2.PingFrame) {
if f.IsAck() {
if f.Data == goAwayPing.data && t.drainChan != nil {
close(t.drainChan)
return
}
// Maybe it's a BDP ping.
if t.bdpEst != nil {
t.bdpEst.calculate(f.Data)
}
return
}
pingAck := &ping{ack: true}
copy(pingAck.data[:], f.Data[:])
t.controlBuf.put(pingAck)
now := time.Now()
defer func() {
t.lastPingAt = now
}()
// A reset ping strikes means that we don't need to check for policy
// violation for this ping and the pingStrikes counter should be set
// to 0.
if atomic.CompareAndSwapUint32(&t.resetPingStrikes, 1, 0) {
t.pingStrikes = 0
return
}
t.mu.Lock()
ns := len(t.activeStreams)
t.mu.Unlock()
if ns < 1 && !t.kep.PermitWithoutStream {
// Keepalive shouldn't be active thus, this new ping should
// have come after at least defaultPingTimeout.
if t.lastPingAt.Add(defaultPingTimeout).After(now) {
t.pingStrikes++
}
} else {
// Check if keepalive policy is respected.
if t.lastPingAt.Add(t.kep.MinTime).After(now) {
t.pingStrikes++
}
}
if t.pingStrikes > maxPingStrikes {
// Send goaway and close the connection.
if logger.V(logLevel) {
logger.Errorf("transport: Got too many pings from the client, closing the connection.")
}
t.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings"), closeConn: true})
}
}
Be careful , Yes pingStrikes Cumulative logic :
t.lastPingAt.Add(defaultPingTimeout).After(now): t.lastPingAt.Add(t.kep.MinTime).After(now):
func (t *http2Server) handlePing(f *http2.PingFrame) {
...
if ns < 1 && !t.kep.PermitWithoutStream {
// Keepalive shouldn't be active thus, this new ping should
// have come after at least defaultPingTimeout.
if t.lastPingAt.Add(defaultPingTimeout).After(now) {
t.pingStrikes++
}
} else {
// Check if keepalive policy is respected.
if t.lastPingAt.Add(t.kep.MinTime).After(now) {
t.pingStrikes++
}
}
if t.pingStrikes > maxPingStrikes {
// Send goaway and close the connection.
errorf("transport: Got too many pings from the client, closing the connection.")
t.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings"), closeConn: true})
}
}
keepalive Related codes
gRPC Create a new server HTTP2 server Will start a separate goroutine Handle keepalive Logic ,newHTTP2Server Method [7]:
func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err error) {
...
go t.keepalive()
...
}
A brief analysis keepalive The implementation of the , The core logic is to start 3 A timer , Respectively maxIdle、maxAge and keepAlive, And then in for select Handle related timer trigger events :
maxIdle Logic : Judge client Whether the idle time exceeds the configured time , If the timeout , Call t.drain, This method sends a GOAWAY package maxAge Logic : Call first after triggering t.drain send out GOAWAY package , Then reset the timer , Time set to MaxConnectionAgeGrace, Call after triggering again t.Close() Direct closure ( There are some graceful The mean of ) keepalive Logic : First judgement activity Is it 1, If not, set pingSent by true, And send ping package , Then reset the timer time to Timeout, If activity Not for 1( That is not received ping Reply to ) also pingSent by true, Call t.Close() Close the connection
func (t *http2Server) keepalive() {
p := &ping{}
var pingSent bool
maxIdle := time.NewTimer(t.kp.MaxConnectionIdle)
maxAge := time.NewTimer(t.kp.MaxConnectionAge)
keepalive := time.NewTimer(t.kp.Time)
// NOTE: All exit paths of this function should reset their
// respective timers. A failure to do so will cause the
// following clean-up to deadlock and eventually leak.
defer func() {
// Before exiting , Complete the recycling of the timer
if !maxIdle.Stop() {
<-maxIdle.C
}
if !maxAge.Stop() {
<-maxAge.C
}
if !keepalive.Stop() {
<-keepalive.C
}
}()
for {
select {
case <-maxIdle.C:
t.mu.Lock()
idle := t.idle
if idle.IsZero() { // The connection is non-idle.
t.mu.Unlock()
maxIdle.Reset(t.kp.MaxConnectionIdle)
continue
}
val := t.kp.MaxConnectionIdle - time.Since(idle)
t.mu.Unlock()
if val <= 0 {
// The connection has been idle for a duration of keepalive.MaxConnectionIdle or more.
// Gracefully close the connection.
t.drain(http2.ErrCodeNo, []byte{})
// Resetting the timer so that the clean-up doesn't deadlock.
maxIdle.Reset(infinity)
return
}
maxIdle.Reset(val)
case <-maxAge.C:
t.drain(http2.ErrCodeNo, []byte{})
maxAge.Reset(t.kp.MaxConnectionAgeGrace)
select {
case <-maxAge.C:
// Close the connection after grace period.
t.Close()
// Resetting the timer so that the clean-up doesn't deadlock.
maxAge.Reset(infinity)
case <-t.ctx.Done():
}
return
case <-keepalive.C:
if atomic.CompareAndSwapUint32(&t.activity, 1, 0) {
pingSent = false
keepalive.Reset(t.kp.Time)
continue
}
if pingSent {
t.Close()
// Resetting the timer so that the clean-up doesn't deadlock.
keepalive.Reset(infinity)
return
}
pingSent = true
if channelz.IsOn() {
atomic.AddInt64(&t.czData.kpCount, 1)
}
t.controlBuf.put(p)
keepalive.Reset(t.kp.Timeout)
case <-t.ctx.Done():
return
}
}
}
Implement a robust long connection client
Reference material [1] Warden About Server After the end service is restarted Client Retry problem after disconnection : https://github.com/go-kratos/kratos/issues/177
[2] keepalive Mechanism : https://github.com/grpc/grpc/blob/master/doc/keepalive.md
[3] ssh client : https://pandaychen.github.io/2019/10/20/HOW-TO-BUILD-A-SSHD-WITH-GOLANG/# client -keepalive- Mechanism
[4] newHTTP2Client: https://github.com/grpc/grpc-go/blob/master/internal/transport/http2_client.go#L166
[5] keepalive Method : https://github.com/grpc/grpc-go/blob/master/internal/transport/http2_client.go#L1350
[6] handlePing Method : https://github.com/grpc/grpc-go/blob/master/internal/transport/http2_server.go#L693
[7] newHTTP2Server Method : https://github.com/grpc/grpc-go/blob/master/internal/transport/http2_server.go#L129
[8] Server side : https://github.com/grpc/grpc-go/blob/master/examples/features/keepalive/server/main.go
[9] client : https://github.com/grpc/grpc-go/blob/master/examples/features/keepalive/client/main.go
[10] GRPC Unpacking manual : https://juejin.im/post/6844904096474857485
边栏推荐
- The Agile Manifesto has four values and twelve principles
- 【英雄哥七月集训】第 25天: 树状数组
- ssm练习第二天_项目拆分moudle_基本增删改查_批量删除_一对一级联查询
- Zhaoqi science and technology innovation overseas high-level talent introduction platform, entrepreneurship event Roadshow
- DTS搭载全新自研内核,突破两地三中心架构的关键技术|腾讯云数据库
- drools-基础语法
- Tianyi cloud web application firewall (edge cloud version) supports the detection and interception of Apache spark shell command injection vulnerabilities
- 剑指offer 跳台阶扩展问题
- AI zhetianchuan ml unsupervised learning
- Leetcode 50 day question brushing plan (day 4 - longest palindrome substring 14.00-16:20)
猜你喜欢
随机推荐
Vector CANoe Menu Plugin拓展入门
URL jump vulnerability
百度飞桨EasyDL X 韦士肯:看轴承质检如何装上“AI之眼”
Bulletgraph (bullet diagram, bullet diagram)
效率提升98%!高海拔光伏电站运维巡检背后的AI利器
SSM练习第五天
10、 Implementation of parameter modification of parameter server
Several ways to resolve hash conflicts
LeetCode50天刷题计划(Day 1—— 两数相加 11.00-12.30)
Hello World
Leetcode 50 day question brushing plan (day 5 - longest palindrome substring 10.50-13:00)
数据仓库:详解维度建模之事实表
Win10 wireless connection cannot input password characters, and it will be stuck as soon as it is input
[training day3] delete
Cross site scripting attack (XSS)
How to switch nodejs versions at will?
Win10 连接无线不能输入密码字符,一输入就卡死
Spark data format unsafe row
Vector CANape - How to Send Receive CAN Message in CANape
“蔚来杯“2022牛客暑期多校训练营3记录




![[training day3] section](/img/f6/6f679375a00c6cce569c102371e9ff.png)




