当前位置:网站首页>Details of dapr implementing distributed stateful service
Details of dapr implementing distributed stateful service
2020-11-06 01:35:00 【itread01】
Dapr It's a cross language design for the cloud environment , Event driven , It is convenient to build micro service system . balabala a pile , Interested partners can go to learn about .
Dapr Providing stateful and stateless microservices . Most people do stateless Services ( Microservices ) Of , It's just that being stateless in some areas doesn't work well , Because the cost is too much ; Stateful services have fixed scenarios , It requires a small cost , Both latency and throughput are high . Talk less nonsense , Look directly at Dapr How to implement stateful service .
Let's take a look at stateful services first :
1. Stable routing
Send to A Requests from server , Can't send to B Server , Otherwise it's stateless
2. Status
The state is stored in its own server , Instead of storing it remotely , There is a clear difference between this and stateless , So stateless services need to use redis This kind of thing accelerates , There's no need to
3. Processing is a single thread
States are generally more complex , It's difficult to do parallel computing on a complex thing ; Of course A and B There is no relationship between the logic of , In fact, it can be parallel , however A Its own logical execution needs sequence execution .
For a stateful service (dapr), Realize 23 It's actually very relaxing , There are even some things that users need to implement , therefore 1 That's the point , The current message ( Ask for ) Which server needs to be sent to for processing is the most critical , It even determines what system he is .
Determine which request's destination address , This thing is called... In a decentralized system Placement, Sometimes it's called Naming. TiDB There is a Server It's called PlacementDriver, Abbreviation PD, It's the same thing .
Okay , Start studying Dapr Of Placement How did it come true .
There is one Placement The program , 2333, Catalog cmd/placement, It's up to him
func main() {
log.Infof("starting Dapr Placement Service -- version %s -- commit %s", version.Version(), version.Commit())
cfg := newConfig()
// Apply options to all loggers.
if err := logger.ApplyOptionsToLoggers(&cfg.loggerOptions); err != nil {
log.Fatal(err)
}
log.Infof("log level set to: %s", cfg.loggerOptions.OutputLevel)
// Initialize dapr metrics for placement.
if err := cfg.metricsExporter.Init(); err != nil {
log.Fatal(err)
}
if err := monitoring.InitMetrics(); err != nil {
log.Fatal(err)
}
// Start Raft cluster.
raftServer := raft.New(cfg.raftID, cfg.raftInMemEnabled, cfg.raftBootStrap, cfg.raftPeers)
if raftServer == nil {
log.Fatal("failed to create raft server.")
}
if err := raftServer.StartRaft(nil); err != nil {
log.Fatalf("failed to start Raft Server: %v", err)
}
// Start Placement gRPC server.
hashing.SetReplicationFactor(cfg.replicationFactor)
apiServer := placement.NewPlacementService(raftServer)
You can see main In the function, you start a raft server, Generally speaking , It shows that we have achieved strong consistency in some aspects of ability .
raft The library uses consul Realized raft, instead of etcd, Because etcd Of raft It's not a library , It can only be a server ( Include etcd embed), You can't customize the protocol inside , You can only use etcd For you client Come and visit him . This point etcd It was very unfriendly .
If you use raft Ku will do it placement, Then the agreement can be customized , You can find Apply Related functions , Because raft State machines are just responsible for log The consistency of , log The message , The processing of information shows state , Apply Functions are where the user needs to process information . Fortunately, I have done it before MIT 6.824 Of lab, A little bit about this .
// Apply log is invoked once a log entry is committed.
func (c *FSM) Apply(log *raft.Log) interface{} {
buf := log.Data
cmdType := CommandType(buf[0])
if log.Index < c.state.Index {
logging.Warnf("old: %d, new index: %d. skip apply", c.state.Index, log.Index)
return nil
}
var err error
var updated bool
switch cmdType {
case MemberUpsert:
updated, err = c.upsertMember(buf[1:])
case MemberRemove:
updated, err = c.removeMember(buf[1:])
default:
err = errors.New("unimplemented command")
}
if err != nil {
return err
}
return updated
}
stay pkg/placement/raft Find... Under the folder raft Related code , fsm.go There are processing functions for messages .
You can see , The processing of messages is very simple , It's just MemberUpsert, and MemberRemove Two messages . FSM The only state stored in the state machine is :
// DaprHostMemberState is the state to store Dapr runtime host and
// consistent hashing tables.
type DaprHostMemberState struct {
// Index is the index number of raft log.
Index uint64
// Members includes Dapr runtime hosts.
Members map[string]*DaprHostMember
// TableGeneration is the generation of hashingTableMap.
// This is increased whenever hashingTableMap is updated.
TableGeneration uint64
// hashingTableMap is the map for storing consistent hashing data
// per Actor types.
hashingTableMap map[string]*hashing.Consistent
}
Obviously , There's only DaprHostMember This useful information , and DaprHostMember It's the nodes in the cluster .
Here we can analyze , Dapr Through Raft Protocol to maintain a strong consistency of Membership, Nothing else .... According to my friend , Follow Orleans It's a little bit similar , It's just Orleans yes AP System .
And then through consistency Hash Analysis of , You can see :
func (a *actorsRuntime) lookupActorAddress(actorType, actorID string) (string, string) {
if a.placementTables == nil {
return "", ""
}
t := a.placementTables.Entries[actorType]
if t == nil {
return "", ""
}
host, err := t.GetHost(actorID)
if err != nil || host == nil {
return "", ""
}
return host.Name, host.AppID
}
Through ActorType and ActorID To consistent Hash Look for host, That one GetHost Implementation is consistency Hash Table implementation .
Actor RPC Call The realization of :
func (a *actorsRuntime) Call(ctx context.Context, req *invokev1.InvokeMethodRequest) (*invokev1.InvokeMethodResponse, error) {
if a.placementBlock {
<-a.placementSignal
}
actor := req.Actor()
targetActorAddress, appID := a.lookupActorAddress(actor.GetActorType(), actor.GetActorId())
if targetActorAddress == "" {
return nil, errors.Errorf("error finding address for actor type %s with id %s", actor.GetActorType(), actor.GetActorId())
}
var resp *invokev1.InvokeMethodResponse
var err error
if a.isActorLocal(targetActorAddress, a.config.HostAddress, a.config.Port) {
resp, err = a.callLocalActor(ctx, req)
} else {
resp, err = a.callRemoteActorWithRetry(ctx, retry.DefaultLinearRetryCount, retry.DefaultLinearBackoffInterval, a.callRemoteActor, targetActorAddress, appID, req)
}
if err != nil {
return nil, err
}
return resp, nil
}
Through what we saw just now loopupActorAddress Function found Host, And then judge whether it's in the present Host In the host , Otherwise, it's sent to the far end , Optimized for the current host , It's not really chicken , Because in a decentralized system , There are usually many host, At current host The probability within is actually very low .
From this side , We can probably analyze the whole picture , namely Dapr Details of implementing decentralized stateful Services :
1. Through Consul Raft Library maintenance Membership
2. Clustering and Placement Component communication , Get Membership
3. Looking for Actor The algorithm of the algorithm is now Host Inside , instead of Placement Components . Through ActorType Find something that can provide a service Host, And then form a consistency Hash surface , Query in the table Host, And forward the request
Yes Host Internal consistency Hash Table query reference , Found a place to change the content :
func (a *actorsRuntime) updatePlacements(in *placementv1pb.PlacementTables) {
a.placementTableLock.Lock()
defer a.placementTableLock.Unlock()
if in.Version != a.placementTables.Version {
for k, v := range in.Entries {
loadMap := map[string]*hashing.Host{}
for lk, lv := range v.LoadMap {
loadMap[lk] = hashing.NewHost(lv.Name, lv.Id, lv.Load, lv.Port)
}
c := hashing.NewFromExisting(v.Hosts, v.SortedSet, loadMap)
a.placementTables.Entries[k] = c
}
a.placementTables.Version = in.Version
a.drainRebalancedActors()
log.Infof("placement tables updated, version: %s", in.GetVersion())
a.evaluateReminders()
}
}
You can see from these lines of code that , The versions are different , It's all updated , And it's going to be rehash, Namely a.drainRebalanceActors.
If you've studied data structure , Then I must have learned something called HashTable, HashTable It needs to be expanded rehash, You need to build a bigger table, And then put all the elements back in , The location will be very different from the original . And consistency Hash It can solve all the problems rehash Situation of , Let only part of the content rehash, There will be less invalid content .
however , But there's one thing about everything , All the nodes are at the same time rehash It's OK , But how can a decentralized system do everything node All at the same time rehash, Obviously it can't be done , therefore Dapr Maintained Actor Address Catalog , It's ultimately consistent , In other words, there will be multiple ID same Actor( Transient ), It still leads to inconsistencies .
Yes dapr/proto/placement/v1/placement.proto Look at , Verified my conjecture
// Placement service is used to report Dapr runtime host status.
service Placement {
rpc ReportDaprStatus(stream Host) returns (stream PlacementOrder) {}
}
message PlacementOrder {
PlacementTables tables = 1;
string operation = 2;
}
Host Start , Just go to placement Over there through gRPC Stream Subscribe to cluster changes . I'm so lazy , It's the whole thing membership Send it over , Not transmitted diff.
To summarize , From the source code analysis above, we can know that , Dapr Of Membership yes CP System , however Actor Of Placement No , It's a final consensus AP System . and TiDB Of PD It's a CP System , It's just through etcd embed It's done . I hope it can help you a little bit .
It helps me , It could be Dapr For Consul raft Use .
Refer to :
1. Dapr
2. Etcd Embed
3. Consul Raft
&n
版权声明
本文为[itread01]所创,转载请带上原文链接,感谢
边栏推荐
- 一篇文章带你了解HTML表格及其主要属性介绍
- 零基础打造一款属于自己的网页搜索引擎
- 零基础打造一款属于自己的网页搜索引擎
- 用一个例子理解JS函数的底层处理机制
- Discussion on the development practice of aspnetcore, a cross platform framework
- Recommendation system based on deep learning
- Vue.js Mobile end left slide delete component
- How to become a data scientist? - kdnuggets
- In order to save money, I learned PHP in one day!
- 零基础打造一款属于自己的网页搜索引擎
猜你喜欢

Not long after graduation, he earned 20000 yuan from private work!

What to do if you are squeezed by old programmers? I don't want to quit

如何玩转sortablejs-vuedraggable实现表单嵌套拖拽功能

Summary of common algorithms of binary tree

Discussion on the technical scheme of text de duplication (1)

How to encapsulate distributed locks more elegantly

只有1个字节的文件实际占用多少磁盘空间

Did you blog today?
![[C / C + + 1] clion configuration and running C language](/img/5b/ba96ff4447b150f50560e5d47cb8d1.jpg)
[C / C + + 1] clion configuration and running C language

The dynamic thread pool in Kitty supports Nacos and Apollo multi configuration centers
随机推荐
【自学unity2d传奇游戏开发】如何让角色动起来
前端都应懂的入门基础-github基础
axios学习笔记(二):轻松弄懂XHR的使用及如何封装简易axios
What to do if you are squeezed by old programmers? I don't want to quit
5.4 static resource mapping
一篇文章带你了解CSS3图片边框
理解格式化原理
React design pattern: in depth understanding of react & Redux principle
Mac installation hanlp, and win installation and use
前端工程师需要懂的前端面试题(c s s方面)总结(二)
一部完整的游戏,需要制作哪些音乐?
How to get started with new HTML5 (2)
带你学习ES5中新增的方法
Humor: hacker programming is actually similar to machine learning!
Python filtering sensitive word records
新建一个空文件占用多少磁盘空间?
It's so embarrassing, fans broke ten thousand, used for a year!
仅用六种字符来完成Hello World,你能做到吗?
【自学unity2d传奇游戏开发】地图编辑器
ES6学习笔记(五):轻松了解ES6的内置扩展对象