当前位置:网站首页>Mit-6.824-lab4b-2022 (10000 word idea explanation - code construction)
Mit-6.824-lab4b-2022 (10000 word idea explanation - code construction)
2022-07-04 23:22:00 【Xingping XP】
List of articles
Preface
All kinds of things recently … This finally lab It also took a long time , Fortunately, I finished this before the internship lab.
One 、 Experimental background
For this experiment is the whole lab1~4 It's the most comprehensive , Because he didn't paper Give guidance , And we need lab2 Of raft Consensus algorithm +lab3 Slice controller for , In common lab4 Build a piecemeal fault-tolerant database .
For the overall experimental structure, please refer to the figure drawn by the author :
client client , Use the sent request Key2Shard Slice , Assigned to a specific group server, And then this server If it is leader Then reuse your own raft Group consensus , Use consensus for the whole server The cluster synchronizes the operation of the current group on the partition , Maintain fault tolerance , The whole system is clustered through lab3 To ensure . The diagram here is also based on the author's ideas and capabilities to reproduce this architecture as much as possible , Also hope to help readers , Of course, these are for reference only .
Two 、client End
about client A few in front of you lab The requirements are almost the same , This time lab The client also improves most of the code . Just add seqId The serial number guarantees the weight removal .
// Get GetType
// fetch the current value for a key.
// returns "" if the key does not exist.
// keeps trying forever in the face of all other errors.
// You will have to modify this function.
//
func (ck *Clerk) Get(key string) string {
ck.seqId++
for {
args := GetArgs{
Key: key,
ClientId: ck.clientId,
RequestId: ck.seqId,
}
shard := key2shard(key)
gid := ck.config.Shards[shard]
if servers, ok := ck.config.Groups[gid]; ok {
// try each server for the shard.
for si := 0; si < len(servers); si++ {
srv := ck.make_end(servers[si])
var reply GetReply
//fmt.Printf("[ ++++Client[%v]++++] : send a GetType,args:%+v,serverId[%v]\n", ck.clientId, args, si)
ok := srv.Call("ShardKV.Get", &args, &reply)
if ok {
//fmt.Printf("[ ++++Client[%v]++++] : receive a GetType,args:%+v ,replys:%+v ,serverId[%v]\n", ck.clientId, args, reply, si)
} else {
//fmt.Printf("[ ++++Client[%v]++++] : Ask Err:args:%+v\n", ck.clientId, args)
}
if ok && (reply.Err == OK || reply.Err == ErrNoKey) {
return reply.Value
}
if ok && (reply.Err == ErrWrongGroup) {
break
}
// ... not ok, or ErrWrongLeader
}
}
time.Sleep(100 * time.Millisecond)
// ask controler for the latest configuration.
ck.config = ck.sm.Query(-1)
}
}
// PutAppend
// shared by Put and Append.
// You will have to modify this function.
//
func (ck *Clerk) PutAppend(key string, value string, op string) {
ck.seqId++
for {
args := PutAppendArgs{
Key: key,
Value: value,
Op: Operation(op),
ClientId: ck.clientId,
RequestId: ck.seqId,
}
shard := key2shard(key)
gid := ck.config.Shards[shard]
if servers, ok := ck.config.Groups[gid]; ok {
for si := 0; si < len(servers); si++ {
srv := ck.make_end(servers[si])
var reply PutAppendReply
//fmt.Printf("[ ++++Client[%v]++++] : send a Put,args:%+v,serverId[%v]\n", ck.clientId, args, si)
ok := srv.Call("ShardKV.PutAppend", &args, &reply)
if ok {
//fmt.Printf("[ ++++Client[%v]++++] : receive a Put,args:%+v ,replys:%+v ,serverId[%v]\n", ck.clientId, args, reply, si)
} else {
//fmt.Printf("[ ++++Client[%v]++++] : Ask Err:args:%+v\n", ck.clientId, args)
}
if ok && reply.Err == OK {
return
}
if ok && reply.Err == ErrWrongGroup {
break
}
// ... not ok, or ErrWrongLeader
}
}
time.Sleep(100 * time.Millisecond)
// ask controler for the latest configuration.
ck.config = ck.sm.Query(-1)
}
}
- tips: In fact, it can be optimized , Is to record according to the previous tradition leaderId such WrongLeader The number of times will become less .
3、 ... and 、server End
3.1、 initialization
This part is the same as before
func StartServer(servers []*labrpc.ClientEnd, me int, persister *raft.Persister, maxraftstate int, gid int, masters []*labrpc.ClientEnd, make_end func(string) *labrpc.ClientEnd) *ShardKV {
// call labgob.Register on structures you want
// Go's RPC library to marshall/unmarshall.
labgob.Register(Op{
})
kv := new(ShardKV)
kv.me = me
kv.maxRaftState = maxraftstate
kv.makeEnd = make_end
kv.gid = gid
kv.masters = masters
// Your initialization code here.
kv.shardsPersist = make([]Shard, shardctrler.NShards)
kv.SeqMap = make(map[int64]int)
// Use something like this to talk to the shardctrler:
// kv.mck = shardctrler.MakeClerk(kv.masters)
kv.sck = shardctrler.MakeClerk(kv.masters)
kv.waitChMap = make(map[int]chan OpReply)
snapshot := persister.ReadSnapshot()
if len(snapshot) > 0 {
kv.DecodeSnapShot(snapshot)
}
kv.applyCh = make(chan raft.ApplyMsg)
kv.rf = raft.Make(servers, me, persister, kv.applyCh)
go kv.applyMsgHandlerLoop()
go kv.ConfigDetectedLoop()
return kv
}
3.2、Loop part
For this time lab Compared with the previous applyMsg You also need a to detect configuration updates Loop, Let's look at the previous applyMsgLoop.
// applyMsgHandlerLoop Handle applyCh Sent by ApplyMsg
func (kv *ShardKV) applyMsgHandlerLoop() {
for {
if kv.killed() {
return
}
select {
case msg := <-kv.applyCh:
if msg.CommandValid == true {
kv.mu.Lock()
op := msg.Command.(Op)
reply := OpReply{
ClientId: op.ClientId,
SeqId: op.SeqId,
Err: OK,
}
if op.OpType == PutType || op.OpType == GetType || op.OpType == AppendType {
shardId := key2shard(op.Key)
//
if kv.Config.Shards[shardId] != kv.gid {
reply.Err = ErrWrongGroup
} else if kv.shardsPersist[shardId].KvMap == nil {
// If the slice that should exist has no data, then the slice has not arrived
reply.Err = ShardNotArrived
} else {
if !kv.ifDuplicate(op.ClientId, op.SeqId) {
kv.SeqMap[op.ClientId] = op.SeqId
switch op.OpType {
case PutType:
kv.shardsPersist[shardId].KvMap[op.Key] = op.Value
case AppendType:
kv.shardsPersist[shardId].KvMap[op.Key] += op.Value
case GetType:
// If it is Get You don't have to do it
default:
log.Fatalf("invalid command type: %v.", op.OpType)
}
}
}
} else {
// request from server of other group
switch op.OpType {
case UpConfigType:
kv.upConfigHandler(op)
case AddShardType:
// If the configuration number is smaller than op Of SeqId It's not the latest configuration
if kv.Config.Num < op.SeqId {
reply.Err = ConfigNotArrived
break
}
kv.addShardHandler(op)
case RemoveShardType:
// remove operation is from previous UpConfig
kv.removeShardHandler(op)
default:
log.Fatalf("invalid command type: %v.", op.OpType)
}
}
// if necessary snapshot, And beyond its stateSize
if kv.maxRaftState != -1 && kv.rf.GetRaftStateSize() > kv.maxRaftState {
snapshot := kv.PersistSnapShot()
kv.rf.Snapshot(msg.CommandIndex, snapshot)
}
ch := kv.getWaitCh(msg.CommandIndex)
ch <- reply
kv.mu.Unlock()
}
if msg.SnapshotValid == true {
if kv.rf.CondInstallSnapshot(msg.SnapshotTerm, msg.SnapshotIndex, msg.Snapshot) {
// Read snapshot data
kv.mu.Lock()
kv.DecodeSnapShot(msg.Snapshot)
kv.mu.Unlock()
}
continue
}
}
}
}
The additional part is for a group Slice inside ( Add and GC) Consensus of , And the consensus of configuration update .
- The next step is to detect the latest configuration Loop Of :
The process of configuration update can be seen as the following figure :
- The slice migration of the whole cluster can be seen as follows :
After updating the configuration, it is found that there is a configuration update : - 1、 Then pass the slice that does not belong to you
- 2、 Receive other group After slicing AddShards Synchronously update slices in the Group
- 3、 If you successfully send slices that do not belong to you or if it times out Gc
// ConfigDetectedLoop Configure detection
func (kv *ShardKV) ConfigDetectedLoop() {
kv.mu.Lock()
curConfig := kv.Config
rf := kv.rf
kv.mu.Unlock()
for !kv.killed() {
// only leader needs to deal with configuration tasks
if _, isLeader := rf.GetState(); !isLeader {
time.Sleep(UpConfigLoopInterval)
continue
}
kv.mu.Lock()
// Judge whether the part that doesn't belong to you is given to others
if !kv.allSent() {
SeqMap := make(map[int64]int)
for k, v := range kv.SeqMap {
SeqMap[k] = v
}
for shardId, gid := range kv.LastConfig.Shards {
// Distribute the partition that does not belong to you in the latest configuration to others
if gid == kv.gid && kv.Config.Shards[shardId] != kv.gid && kv.shardsPersist[shardId].ConfigNum < kv.Config.Num {
sendDate := kv.cloneShard(kv.Config.Num, kv.shardsPersist[shardId].KvMap)
args := SendShardArg{
LastAppliedRequestId: SeqMap,
ShardId: shardId,
Shard: sendDate,
ClientId: int64(gid),
RequestId: kv.Config.Num,
}
// shardId -> gid -> server names
serversList := kv.Config.Groups[kv.Config.Shards[shardId]]
servers := make([]*labrpc.ClientEnd, len(serversList))
for i, name := range serversList {
servers[i] = kv.makeEnd(name)
}
// Start the collaboration to send slices to each client ( What is sent here should be another group , Our own consensus group needs raft Make status changes )
go func(servers []*labrpc.ClientEnd, args *SendShardArg) {
index := 0
start := time.Now()
for {
var reply AddShardReply
// Make a consensus within your own group add
ok := servers[index].Call("ShardKV.AddShard", args, &reply)
// If the slice is given successfully , Or time out , Both cases need to be carried out GC Drop the slices that don't belong to you
if ok && reply.Err == OK || time.Now().Sub(start) >= 5*time.Second {
// If it works
kv.mu.Lock()
command := Op{
OpType: RemoveShardType,
ClientId: int64(kv.gid),
SeqId: kv.Config.Num,
ShardId: args.ShardId,
}
kv.mu.Unlock()
kv.startCommand(command, RemoveShardsTimeout)
break
}
index = (index + 1) % len(servers)
if index == 0 {
time.Sleep(UpConfigLoopInterval)
}
}
}(servers, &args)
}
}
kv.mu.Unlock()
time.Sleep(UpConfigLoopInterval)
continue
}
if !kv.allReceived() {
kv.mu.Unlock()
time.Sleep(UpConfigLoopInterval)
continue
}
// current configuration is configured, poll for the next configuration
curConfig = kv.Config
sck := kv.sck
kv.mu.Unlock()
newConfig := sck.Query(curConfig.Num + 1)
if newConfig.Num != curConfig.Num+1 {
time.Sleep(UpConfigLoopInterval)
continue
}
command := Op{
OpType: UpConfigType,
ClientId: int64(kv.gid),
SeqId: newConfig.Num,
UpConfig: newConfig,
}
kv.startCommand(command, UpConfigTimeout)
}
}
- tips: There will also be a duplicate removal The problem of , Compared with the client RPC By means of client End of seqId Self increasing , About the self increment of the configuration , Just use the configuration number , As long as the configuration is updated , Then a series of operations will be related to The latest configuration number is related to .
3.3、raft part Rpc
This part is the same as the previous lab The realization of is almost , utilize raft achieve group Slice configuration consensus within the Group .
//------------------------------------------------------RPC part ----------------------------------------------------------
func (kv *ShardKV) Get(args *GetArgs, reply *GetReply) {
shardId := key2shard(args.Key)
kv.mu.Lock()
if kv.Config.Shards[shardId] != kv.gid {
reply.Err = ErrWrongGroup
} else if kv.shardsPersist[shardId].KvMap == nil {
reply.Err = ShardNotArrived
}
kv.mu.Unlock()
if reply.Err == ErrWrongGroup || reply.Err == ShardNotArrived {
return
}
command := Op{
OpType: GetType,
ClientId: args.ClientId,
SeqId: args.RequestId,
Key: args.Key,
}
err := kv.startCommand(command, GetTimeout)
if err != OK {
reply.Err = err
return
}
kv.mu.Lock()
if kv.Config.Shards[shardId] != kv.gid {
reply.Err = ErrWrongGroup
} else if kv.shardsPersist[shardId].KvMap == nil {
reply.Err = ShardNotArrived
} else {
reply.Err = OK
reply.Value = kv.shardsPersist[shardId].KvMap[args.Key]
}
kv.mu.Unlock()
return
}
func (kv *ShardKV) PutAppend(args *PutAppendArgs, reply *PutAppendReply) {
shardId := key2shard(args.Key)
kv.mu.Lock()
if kv.Config.Shards[shardId] != kv.gid {
reply.Err = ErrWrongGroup
} else if kv.shardsPersist[shardId].KvMap == nil {
reply.Err = ShardNotArrived
}
kv.mu.Unlock()
if reply.Err == ErrWrongGroup || reply.Err == ShardNotArrived {
return
}
command := Op{
OpType: args.Op,
ClientId: args.ClientId,
SeqId: args.RequestId,
Key: args.Key,
Value: args.Value,
}
reply.Err = kv.startCommand(command, AppOrPutTimeout)
return
}
// AddShard move shards from caller to this server
func (kv *ShardKV) AddShard(args *SendShardArg, reply *AddShardReply) {
command := Op{
OpType: AddShardType,
ClientId: args.ClientId,
SeqId: args.RequestId,
ShardId: args.ShardId,
Shard: args.Shard,
SeqMap: args.LastAppliedRequestId,
}
reply.Err = kv.startCommand(command, AddShardsTimeout)
return
}
3.5、Handler part
Mainly for applyMsgHandlerLoop Medium section 、 Configuration update and other operations handler.
//------------------------------------------------------handler part ------------------------------------------------------
// Update the latest config Of handler
func (kv *ShardKV) upConfigHandler(op Op) {
curConfig := kv.Config
upConfig := op.UpConfig
if curConfig.Num >= upConfig.Num {
return
}
for shard, gid := range upConfig.Shards {
if gid == kv.gid && curConfig.Shards[shard] == 0 {
// If the updated configuration gid With the current configuration gid The same and divided into 0( Not allocated )
kv.shardsPersist[shard].KvMap = make(map[string]string)
kv.shardsPersist[shard].ConfigNum = upConfig.Num
}
}
kv.LastConfig = curConfig
kv.Config = upConfig
}
func (kv *ShardKV) addShardHandler(op Op) {
// this shard is added or it is an outdated command
if kv.shardsPersist[op.ShardId].KvMap != nil || op.Shard.ConfigNum < kv.Config.Num {
return
}
kv.shardsPersist[op.ShardId] = kv.cloneShard(op.Shard.ConfigNum, op.Shard.KvMap)
for clientId, seqId := range op.SeqMap {
if r, ok := kv.SeqMap[clientId]; !ok || r < seqId {
kv.SeqMap[clientId] = seqId
}
}
}
func (kv *ShardKV) removeShardHandler(op Op) {
if op.SeqId < kv.Config.Num {
return
}
kv.shardsPersist[op.ShardId].KvMap = nil
kv.shardsPersist[op.ShardId].ConfigNum = op.SeqId
}
3.6、 The snapshot part
This part is the same as before lab To achieve the same , Refer to the previous author lab, I'm not going to go into details here .
// PersistSnapShot Snapshot get snapshot data of kvserver
func (kv *ShardKV) PersistSnapShot() []byte {
w := new(bytes.Buffer)
e := labgob.NewEncoder(w)
err := e.Encode(kv.shardsPersist)
err = e.Encode(kv.SeqMap)
err = e.Encode(kv.maxRaftState)
err = e.Encode(kv.Config)
err = e.Encode(kv.LastConfig)
if err != nil {
log.Fatalf("[%d-%d] fails to take snapshot.", kv.gid, kv.me)
}
return w.Bytes()
}
// DecodeSnapShot install a given snapshot
func (kv *ShardKV) DecodeSnapShot(snapshot []byte) {
if snapshot == nil || len(snapshot) < 1 {
// bootstrap without any state?
return
}
r := bytes.NewBuffer(snapshot)
d := labgob.NewDecoder(r)
var shardsPersist []Shard
var SeqMap map[int64]int
var MaxRaftState int
var Config, LastConfig shardctrler.Config
if d.Decode(&shardsPersist) != nil || d.Decode(&SeqMap) != nil ||
d.Decode(&MaxRaftState) != nil || d.Decode(&Config) != nil || d.Decode(&LastConfig) != nil {
log.Fatalf("[Server(%v)] Failed to decode snapshot!!!", kv.me)
} else {
kv.shardsPersist = shardsPersist
kv.SeqMap = SeqMap
kv.maxRaftState = MaxRaftState
kv.Config = Config
kv.LastConfig = LastConfig
}
}
Four 、lab gossip
about Lab Of challenge:
- In fact, for the whole distributed concurrent process , I think the purpose of keeping the correct results is to try Multiple operations in a group Different times of calls can make this group of operation sequence not due to external factors ( Time and so on ) The impact occurs as expected , It's like revisiting recently Java The art of concurrent programming , Among them happen-before In fact, it creates a fantasy for programmers : The correct synchronization of multithreaded programs is by pressing happens-before Specified but executed in sequence . Even if you compiler 、 How to reorder and optimize the processor trap Time for , The final result is the same as you expected . And combine the whole lab I prefer to call it , Try to keep the whole system process Linearization (Linearization). As long as the linearization is improved , Even if the system seems to be concurrent , But it's actually serial .
For the experiment at this time challenge We just need to define the specific migration process of this slice :
Define a linear control process ,challenge That's how it works .
func (kv *ShardKV) allSent() bool {
for shard, gid := range kv.LastConfig.Shards {
// If the information in the partition in the current configuration does not match , And the configuration number in persistence is smaller , Description has not been sent
if gid == kv.gid && kv.Config.Shards[shard] != kv.gid && kv.shardsPersist[shard].ConfigNum < kv.Config.Num {
return false
}
}
return true
}
func (kv *ShardKV) allReceived() bool {
for shard, gid := range kv.LastConfig.Shards {
// Determine whether the slices have been received
if gid != kv.gid && kv.Config.Shards[shard] == kv.gid && kv.shardsPersist[shard].ConfigNum < kv.Config.Num {
return false
}
}
return true
}
summary
Finish this lab Whole 6.824 Of lab It's officially finished , See if there is time behind , Write about the understanding of distributed systems . I have to say a set lab The experiment is still very difficult , And often feel headache , I will admire the design more Lab People who . In fact, for this series Lab In fact, part of my original intention is for my convenience in the follow-up , But recently lab6.824 More and more students ( Drawing is becoming more and more experienced , I also hope my blog can help readers , Of course, these are the author's shallow opinions , For reference only , There will also be some misunderstandings , Welcome to correct ~
- Test screenshot :
( This time test It is related to the experiment 100s Not much worse , and clientd The code is not rewritten , You can also optimize , But recently, there is really no time orz… Busy with all kinds of things . For tests under concurrency unreliable Sometimes fail, There is no time to read this for the time being . I'll solve it later .
Enclosed gitee For reference only :Mit6.824-2022
边栏推荐
- OSEK标准ISO_17356汇总介绍
- Etcd database source code analysis - brief process of processing entry records
- HMS core machine learning service
- Explanation of bitwise operators
- 刷题指南-public
- CTF competition problem solution STM32 reverse introduction
- Network namespace
- Intelligence test to see idioms guess ancient poems wechat applet source code
- JS 3D explosive fragment image switching JS special effect
- Redis: redis transactions
猜你喜欢
随机推荐
HMS core unified scanning service
[sword finger offer] questions 1-5
Notepad++--编辑的技巧
Solve the problem that the virtual machine cannot be remotely connected through SSH service
香港珠宝大亨,22亿“抄底”佐丹奴
MariaDB's Galera cluster application scenario -- multi master and multi active databases
Tweenmax emoticon button JS special effect
实战模拟│JWT 登录认证
[Taichi] change pbf2d (position based fluid simulation) of Taiji to pbf3d with minimal modification
取得PMP证书需要多长时间?
Why does infographic help your SEO
EditPlus--用法--快捷键/配置/背景色/字体大小
Mysql database backup and recovery -- mysqldump command
How to choose a securities company? Is it safe to open an account on your mobile phone
【爬虫】数据提取之xpath
[ODX studio edit PDX] - 0.2-how to compare two pdx/odx files of compare
Basic knowledge of database
LIst 相关待整理的知识点
蓝天NH55系列笔记本内存读写速度奇慢解决过程记录
Financial markets, asset management and investment funds