当前位置:网站首页>[kitex source code interpretation] service discovery
[kitex source code interpretation] service discovery
2022-07-26 18:19:00 【InfoQ】
Kitex
Service discovery
- Clients do , The client gets one or a group of from the registry ( Then choose one according to the algorithm );
- Do it on the server side , The client directly requests an address , Then do load balancing and forward to one of the service nodes , for example k8s Of service .
usage
func main() {
// Get an instance of the service discovery component through the default configuration
// The method has a default configuration to create a nacos client , Convenient for follow-up and nacos Interaction
r, err := resolver.NewDefaultNacosResolver()
if err != nil {
panic(err)
}
newClient := hello.MustNewClient(
"Hello",
// With WithOption Service discovery instances
client.WithResolver(r),
client.WithRPCTimeout(time.Second*3),
)
for {
resp, err := newClient.Echo(context.Background(), &api.Request{Message: "Hello"})
if err != nil {
log.Fatal(err)
}
log.Println(resp)
time.Sleep(time.Second)
}
}
The exploration process
Start server
go run example/custom-config/server/main.go
Start client
go run example/custom-config/client/main.go
Source code analysis
client.WithResolver



- NewClient when , On initialization ;
- When called remotely ,middleware Service discovery processing in ;
NewClient initialization

initLBCache
type kClient struct {
svcInfo *serviceinfo.ServiceInfo
mws []endpoint.Middleware
eps endpoint.Endpoint
sEps endpoint.Endpoint
opt *client.Options
lbf *lbcache.BalancerFactory
inited bool
closed bool
}type BalancerFactory struct {
Hookable
opts Options
cache sync.Map // key -> LoadBalancer
resolver discovery.Resolver
balancer loadbalance.Loadbalancer
rebalancer loadbalance.Rebalancer
sfg singleflight.Group
}unc (kc *kClient) initLBCache() error {
resolver := kc.opt.Resolver
if resolver == nil {
// Generate a carry kerrors.ErrNoResolver FALSE resolver
}
balancer := kc.opt.Balancer
// Weight based load balancing is used by default
// see https://www.cloudwego.io/zh/docs/kitex/tutorials/basic-feature/loadbalance/
if balancer == nil {
balancer = loadbalance.NewWeightedBalancer()
}
cacheOpts := lbcache.Options{DiagnosisService: kc.opt.DebugService}
if kc.opt.BalancerCacheOpt != nil {
cacheOpts = *kc.opt.BalancerCacheOpt
}
// 1、 Generate a BalancerFactory object ;
// 2、 Check cacheOpts Refresh interval settings , If not, it defaults to 5s
// 3、 Check cacheOpts Cache expiration time setting , If not, it defaults to 15s
// 4、 The asynchronous collaboration periodically marks that the information discovered by the service is out of date , The details are BalancerFactory.cache Field
// 5、 The final will be resolver + balancer + cacheOpts.RefreshInterval + cacheOpts.ExpireInterval by key,
// The cache to balancerFactories Of sync.Map On
kc.lbf = lbcache.NewBalancerFactory(resolver, balancer, cacheOpts)
// ...
return nil
}initMiddlewares
// Middleware deal with input Endpoint and output Endpoint.
type Middleware func(Endpoint) Endpoint
// ad locum , We just need to know newResolveMWBuilder Back to endpoint.Middleware The function of
// We will analyze the specific logic later
func (kc *kClient) initMiddlewares(ctx context.Context) {
// ...
if kc.opt.Proxy == nil {
// ...
kc.mws = append(kc.mws, newResolveMWBuilder(kc.lbf)(ctx))
// ...
} else {
if kc.opt.Resolver != nil { // Custom service discovery
kc.mws = append(kc.mws, newResolveMWBuilder(kc.lbf)(ctx))
}
kc.mws = append(kc.mws, newProxyMW(kc.opt.Proxy))
}
// ...
}
// After the above initialization process , It is the assembly of a series of things that need to be used Middleware To kClient.mws In this slice buildInvokeChain
func (kc *kClient) buildInvokeChain() error {
// Build a that sends requests and receives responses Endpoint, Mainly interact with the remote
innerHandlerEp, err := kc.invokeHandleEndpoint()
if err != nil {
return err
}
// Match the call processing logic with all the processed in the previous step Middleware In series , Put it in kClient.eps in
kc.eps = endpoint.Chain(kc.mws...)(innerHandlerEp)
// ...
return nil
}Pay attention to the key points
middleware Service discovery
kClient.eps
kClient.epsendpoint.Endpoint


newResolveMWBuilder
kClient.initMiddlewaresendpoint.MiddlewarekClient.Callkc.opstype RPCInfo interface {
From() EndpointInfo
To() EndpointInfo
Invocation() Invocation
Config() RPCConfig
Stats() RPCStats
}
type Balancer struct {
b *BalancerFactory
target string // a description returned from the resolver's Target method
res atomic.Value // newest and previous discovery result
expire int32 // 0 = normal, 1 = expire and collect next ticker
sharedTicker *sharedTicker
}
return func(ctx context.Context, request, response interface{}) error {
// Get RPC Related information
rpcInfo := rpcinfo.GetRPCInfo(ctx)
dest := rpcInfo.To()
if dest == nil {
return kerrors.ErrNoDestService
}
remote := remoteinfo.AsRemoteInfo(dest)
if remote == nil {
err := fmt.Errorf("unsupported target EndpointInfo type: %T", dest)
return kerrors.ErrInternalException.WithCause(err)
}
if remote.GetInstance() != nil {
return next(ctx, request, response)
}
// 1、 from BalancerFactory Get the equalizer corresponding to the target service Balancer Etc
// 2、 There is also a cache here , The refresh mechanism is set according to cacheOpts.RefreshInterval The time is refreshed regularly
// reference Balancer.refresh Code for (https://github.com/cloudwego/kitex/blob/develop/pkg/loadbalance/lbcache/cache.go#L184)
// 3、 According to dest You can get different Balancer
lb, err := lbf.Get(ctx, dest)
if err != nil {
return kerrors.ErrServiceDiscovery.WithCause(err)
}
// picker yes BalancerFactory.balancer Corresponding to the equalizer set
// picker Of The main logic is to select one from a set of service instance information , As default WeightedRandom Weight based randomization
picker := lb.GetPicker()
if r, ok := picker.(internal.Reusable); ok {
defer r.Recycle()
}
var lastErr error
for {
select {
case <-ctx.Done():
return kerrors.ErrRPCTimeout
default:
}
// Specifically, select one of the nodes , For example, you can refer to : randomPicker.Next
ins := picker.Next(ctx, request)
if ins == nil {
return kerrors.ErrNoMoreInstance.WithCause(fmt.Errorf("last error: %w", lastErr))
}
remote.SetInstance(ins)
// TODO: generalize retry strategy
if err = next(ctx, request, response); err != nil && retryable(err) {
lastErr = err
continue
}
return err
}
}边栏推荐
- [ Kitex 源码解读 ] 服务发现
- Point cloud target detection Kitti dataset bin file visualization, one-stop solution
- 成为测试/开发程序员,小张:现实就来了个下马威......
- web项目文件简单上传和下载
- 2、 Topic communication principle, code implementation
- How about the employment prospects of Russian translation? How to do a good job of Russian translation
- SSM练习第五天
- 老子云携手福昕鲲鹏,首次实现3D OFD三维版式文档的重大突破
- Leetcode 50 day question brushing plan (day 3 - concatenate substrings of all words 10.00-13.20)
- Oracle第一天(开发常用的知识点再回顾整理下)
猜你喜欢

Several ways to resolve hash conflicts

The second set of 2020 American Asian individual match
![[training day3] delete](/img/7b/40bfb7710427696a27796428d849ba.png)
[training day3] delete

【数字IC】深入浅出理解AXI-Lite协议

How about the employment prospects of Russian translation? How to do a good job of Russian translation

ICML 2022 (Part 4) | | graph hierarchical alignment graph kernel to realize graph matching

AI zhetianchuan DL regression and classification
![[training Day1] spy dispatch](/img/cd/34845de1093c777f6ecff35d9452c3.png)
[training Day1] spy dispatch

CentOS installs docker and MySQL and redis environments

AI zhetianchuan ml unsupervised learning
随机推荐
4、 Service communication principle, code implementation
Vector CANoe Menu Plugin拓展入门
ICML 2022 (Part 4) | | graph hierarchical alignment graph kernel to realize graph matching
1、 Header file, output format,::, namespace
8.1 Diffie-Hellman密钥交换
菜鸟 CPaaS 平台微服务治理实践
2022年的PMP考试大纲是什么?
SSM练习第五天
Continue to work hard on your skills, and the more you learn, the more you will learn
[training day3] section
2、 Topic communication principle, code implementation
Simple uploading and downloading of Web project files
LeetCode 0139. 单词拆分
钉钉第三方服务商应用ISV应用开发及上架教程
AI sky covering DL multilayer perceptron
BulletGraph(子弹图、项目符号图)
AI zhetianchuan DL regression and classification
Oracle第一天(开发常用的知识点再回顾整理下)
【英雄哥七月集训】第 25天: 树状数组
Click hijacking attack