当前位置:网站首页>MOSN 反向通道详解
MOSN 反向通道详解
2022-08-02 19:08:00 【InfoQ】

Part.1--贡献者前言
MOSN 项目概述
Part.2--MOSN 的反向通道实现

Part.3--反向通道启动过程
type AgentBootstrapConfig struct {
Enable bool `json:"enable"`
// The number of connections established between the agent and each server
ConnectionNum int `json:"connection_num"`
// The cluster of remote server
Cluster string `json:"cluster"`
// After the connection is established, the data transmission is processed by this listener
HostingListener string `json:"hosting_listener"`
// Static remote server list
StaticServerList []string `json:"server_list"`
// DynamicServerListConfig is used to specify dynamic server configuration
DynamicServerListConfig struct {
DynamicServerLister string `json:"dynamic_server_lister"`
}
// ConnectRetryTimes
ConnectRetryTimes int `json:"connect_retry_times"`
// ReconnectBaseDuration
ReconnectBaseDurationMs int `json:"reconnect_base_duration_ms"`
// ConnectTimeoutDurationMs specifies the timeout for establishing a connection and initializing the agent
ConnectTimeoutDurationMs int `json:"connect_timeout_duration_ms"`
CredentialPolicy string `json:"credential_policy"`
// GracefulCloseMaxWaitDurationMs specifies the maximum waiting time to close conn gracefully
GracefulCloseMaxWaitDurationMs int `json:"graceful_close_max_wait_duration_ms"`
TLSContext *v2.TLSConfig `json:"tls_context"`
}
func (a *AgentPeer) Start() {
connList := make([]*AgentClientConnection, 0, a.conf.ConnectionNumPerAddress)
for i := 0; i < a.conf.ConnectionNumPerAddress; i++ {
// 初始化和建立反向连接
conn := NewAgentCoreConnection(*a.conf, a.listener)
err := conn.initConnection()
if err == nil {
connList = append(connList, conn)
}
}
a.connections = connList
// 建立一个旁路控制连接
a.initAside()
}
func (a *connection) initConnection() error {
var err error
backoffConnectDuration := a.reconnectBaseDuration
for i := 0; i < a.connectRetryTimes || a.connectRetryTimes == -1; i++ {
if a.close.Load() {
return fmt.Errorf("connection closed, don't attempt to connect, address: %v", a.address)
}
// 1. 初始化物理连接和传输反向连接元数据
err = a.init()
if err == nil {
break
}
log.DefaultLogger.Errorf("[agent] failed to connect remote server, try again after %v seconds, address: %v, err: %+v", backoffConnectDuration, a.address, err)
time.Sleep(backoffConnectDuration)
backoffConnectDuration *= 2
}
if err != nil {
return err
}
// 2. 托管listener
utils.GoWithRecover(func() {
ch := make(chan api.Connection, 1)
a.listener.GetListenerCallbacks().OnAccept(a.rawc, a.listener.UseOriginalDst(), nil, ch, a.readBuffer.Bytes(), []api.ConnectionEventListener{a})
}, nil)
return nil
}
Part.4--交互过程


// ConnectionInitInfo is the basic information of agent host,
// it is sent immediately after the physical connection is established
type ConnectionInitInfo struct {
ClusterName string `json:"cluster_name"`
Weight int64 `json:"weight"`
HostName string `json:"host_name"`
CredentialPolicy string `json:"credential_policy"`
Credential string `json:"credential"`
Extra map[string]interface{} `json:"extra"`
}
func (t *tunnelFilter) handleConnectionInit(info *ConnectionInitInfo) api.FilterStatus {
// Auth the connection
conn := t.readCallbacks.Connection()
if info.CredentialPolicy != "" {
// 1. 自定义鉴权操作,篇幅原因省略
}
if !t.clusterManager.ClusterExist(info.ClusterName) {
writeConnectResponse(ConnectClusterNotExist, conn)
return api.Stop
}
// Set the flag that has been initialized, subsequent data processing skips this filter
err := writeConnectResponse(ConnectSuccess, conn)
if err != nil {
return api.Stop
}
conn.AddConnectionEventListener(NewHostRemover(conn.RemoteAddr().String(), info.ClusterName))
tunnelHostMutex.Lock()
defer tunnelHostMutex.Unlock()
snapshot := t.clusterManager.GetClusterSnapshot(context.Background(), info.ClusterName)
// 2. host加入到指定的cluster
_ = t.clusterManager.AppendClusterTypesHosts(info.ClusterName, []types.Host{NewHost(v2.Host{
HostConfig: v2.HostConfig{
Address: conn.RemoteAddr().String(),
Hostname: info.HostName,
Weight: uint32(info.Weight),
TLSDisable: false,
}}, snapshot.ClusterInfo(), CreateAgentBackendConnection(conn))})
t.connInitialized = true
return api.Stop
}

Part.5--总结和规划
了解更多…
边栏推荐
猜你喜欢
动态折线图,制作原来是这么简单
idea 配置resin
中国科学院院属研究单位
What is the use of IT assets management software
I have 8 years of experience in the Ali test, and I was able to survive by relying on this understanding.
【动态规划专项训练】基础篇
中职网络安全竞赛之应用服务漏洞扫描与利用
selenium installation and environment configuration firefox
Three components of NIO foundation
NC | Structure and function of soil microbiome reveal N2O release from global wetlands
随机推荐
【C语言刷题】Leetcode238——除自身以外数组的乘积
Golang swagger :missing required param comment parameters
86.(cesium之家)cesium叠加面接收阴影效果(gltf模型)
服务器Centos7 静默安装Oracle Database 12.2
spack install reports an error /tmp/ccBDQNaB.s: Assembler message:
What is the use of IT assets management software
ssh configuration
3年半测试经验,20K我都没有,看来是时候跳槽了
【LeetCode】118. 杨辉三角 - Go 语言题解
基于OpenGL的冰川与火鸟(光照计算模型、视景体、粒子系统)
JVM内存和垃圾回收-05.虚拟机栈
AI智能剪辑,仅需2秒一键提取精彩片段
请教一个数据库连接池的问题,目前已知是事务未设置超时,又有一块代码事务没有提交,一直把连接给耗尽了,
Dynamically generate different types of orders, how do I deposit to mongo database?
LeetCode每日一题(324. Wiggle Sort II)
让你的应用完美适配平板
栈、队列和数组
光源控制器接口定义说明
如何正确地配置入口文件?
2022-07-28