当前位置:网站首页>Mit-6.824-lab4b-2022 (10000 word idea explanation - code construction)

Mit-6.824-lab4b-2022 (10000 word idea explanation - code construction)

2022-07-04 23:22:00 Xingping XP


Preface

All kinds of things recently … This finally lab It also took a long time , Fortunately, I finished this before the internship lab.

One 、 Experimental background

For this experiment is the whole lab1~4 It's the most comprehensive , Because he didn't paper Give guidance , And we need lab2 Of raft Consensus algorithm +lab3 Slice controller for , In common lab4 Build a piecemeal fault-tolerant database .
For the overall experimental structure, please refer to the figure drawn by the author :
 Insert picture description here
client client , Use the sent request Key2Shard Slice , Assigned to a specific group server, And then this server If it is leader Then reuse your own raft Group consensus , Use consensus for the whole server The cluster synchronizes the operation of the current group on the partition , Maintain fault tolerance , The whole system is clustered through lab3 To ensure . The diagram here is also based on the author's ideas and capabilities to reproduce this architecture as much as possible , Also hope to help readers , Of course, these are for reference only .

Two 、client End

about client A few in front of you lab The requirements are almost the same , This time lab The client also improves most of the code . Just add seqId The serial number guarantees the weight removal .

// Get GetType
// fetch the current value for a key.
// returns "" if the key does not exist.
// keeps trying forever in the face of all other errors.
// You will have to modify this function.
//
func (ck *Clerk) Get(key string) string {
    
	ck.seqId++

	for {
    
		args := GetArgs{
    
			Key:       key,
			ClientId:  ck.clientId,
			RequestId: ck.seqId,
		}
		shard := key2shard(key)
		gid := ck.config.Shards[shard]
		if servers, ok := ck.config.Groups[gid]; ok {
    
			// try each server for the shard.
			for si := 0; si < len(servers); si++ {
    
				srv := ck.make_end(servers[si])
				var reply GetReply
				//fmt.Printf("[ ++++Client[%v]++++] : send a GetType,args:%+v,serverId[%v]\n", ck.clientId, args, si)
				ok := srv.Call("ShardKV.Get", &args, &reply)
				if ok {
    
					//fmt.Printf("[ ++++Client[%v]++++] : receive a GetType,args:%+v ,replys:%+v ,serverId[%v]\n", ck.clientId, args, reply, si)

				} else {
    
					//fmt.Printf("[ ++++Client[%v]++++] : Ask Err:args:%+v\n", ck.clientId, args)

				}

				if ok && (reply.Err == OK || reply.Err == ErrNoKey) {
    
					return reply.Value
				}
				if ok && (reply.Err == ErrWrongGroup) {
    
					break
				}
				// ... not ok, or ErrWrongLeader
			}
		}
		time.Sleep(100 * time.Millisecond)
		// ask controler for the latest configuration.
		ck.config = ck.sm.Query(-1)
	}

}

// PutAppend
// shared by Put and Append.
// You will have to modify this function.
//
func (ck *Clerk) PutAppend(key string, value string, op string) {
    
	ck.seqId++

	for {
    
		args := PutAppendArgs{
    
			Key:       key,
			Value:     value,
			Op:        Operation(op),
			ClientId:  ck.clientId,
			RequestId: ck.seqId,
		}
		shard := key2shard(key)
		gid := ck.config.Shards[shard]
		if servers, ok := ck.config.Groups[gid]; ok {
    
			for si := 0; si < len(servers); si++ {
    
				srv := ck.make_end(servers[si])
				var reply PutAppendReply
				//fmt.Printf("[ ++++Client[%v]++++] : send a Put,args:%+v,serverId[%v]\n", ck.clientId, args, si)
				ok := srv.Call("ShardKV.PutAppend", &args, &reply)
				if ok {
    
					//fmt.Printf("[ ++++Client[%v]++++] : receive a Put,args:%+v ,replys:%+v ,serverId[%v]\n", ck.clientId, args, reply, si)
				} else {
    
					//fmt.Printf("[ ++++Client[%v]++++] : Ask Err:args:%+v\n", ck.clientId, args)
				}
				if ok && reply.Err == OK {
    
					return
				}
				if ok && reply.Err == ErrWrongGroup {
    
					break
				}
				// ... not ok, or ErrWrongLeader
			}
		}

		time.Sleep(100 * time.Millisecond)
		// ask controler for the latest configuration.
		ck.config = ck.sm.Query(-1)
	}
}
  • tips: In fact, it can be optimized , Is to record according to the previous tradition leaderId such WrongLeader The number of times will become less .

3、 ... and 、server End

3.1、 initialization

This part is the same as before

func StartServer(servers []*labrpc.ClientEnd, me int, persister *raft.Persister, maxraftstate int, gid int, masters []*labrpc.ClientEnd, make_end func(string) *labrpc.ClientEnd) *ShardKV {
    
	// call labgob.Register on structures you want
	// Go's RPC library to marshall/unmarshall.
	labgob.Register(Op{
    })

	kv := new(ShardKV)
	kv.me = me
	kv.maxRaftState = maxraftstate
	kv.makeEnd = make_end
	kv.gid = gid
	kv.masters = masters

	// Your initialization code here.

	kv.shardsPersist = make([]Shard, shardctrler.NShards)

	kv.SeqMap = make(map[int64]int)

	// Use something like this to talk to the shardctrler:
	// kv.mck = shardctrler.MakeClerk(kv.masters)
	kv.sck = shardctrler.MakeClerk(kv.masters)
	kv.waitChMap = make(map[int]chan OpReply)

	snapshot := persister.ReadSnapshot()
	if len(snapshot) > 0 {
    
		kv.DecodeSnapShot(snapshot)
	}

	kv.applyCh = make(chan raft.ApplyMsg)
	kv.rf = raft.Make(servers, me, persister, kv.applyCh)

	go kv.applyMsgHandlerLoop()
	go kv.ConfigDetectedLoop()

	return kv
}

3.2、Loop part

For this time lab Compared with the previous applyMsg You also need a to detect configuration updates Loop, Let's look at the previous applyMsgLoop.

// applyMsgHandlerLoop  Handle applyCh Sent by ApplyMsg
func (kv *ShardKV) applyMsgHandlerLoop() {
    
	for {
    
		if kv.killed() {
    
			return
		}
		select {
    

		case msg := <-kv.applyCh:

			if msg.CommandValid == true {
    
				kv.mu.Lock()
				op := msg.Command.(Op)
				reply := OpReply{
    
					ClientId: op.ClientId,
					SeqId:    op.SeqId,
					Err:      OK,
				}

				if op.OpType == PutType || op.OpType == GetType || op.OpType == AppendType {
    

					shardId := key2shard(op.Key)

					//
					if kv.Config.Shards[shardId] != kv.gid {
    
						reply.Err = ErrWrongGroup
					} else if kv.shardsPersist[shardId].KvMap == nil {
    
						//  If the slice that should exist has no data, then the slice has not arrived 
						reply.Err = ShardNotArrived
					} else {
    

						if !kv.ifDuplicate(op.ClientId, op.SeqId) {
    

							kv.SeqMap[op.ClientId] = op.SeqId
							switch op.OpType {
    
							case PutType:
								kv.shardsPersist[shardId].KvMap[op.Key] = op.Value
							case AppendType:
								kv.shardsPersist[shardId].KvMap[op.Key] += op.Value
							case GetType:
								//  If it is Get You don't have to do it 
							default:
								log.Fatalf("invalid command type: %v.", op.OpType)
							}
						}
					}
				} else {
    
					// request from server of other group
					switch op.OpType {
    

					case UpConfigType:
						kv.upConfigHandler(op)
					case AddShardType:

						//  If the configuration number is smaller than op Of SeqId It's not the latest configuration 
						if kv.Config.Num < op.SeqId {
    
							reply.Err = ConfigNotArrived
							break
						}
						kv.addShardHandler(op)
					case RemoveShardType:
						// remove operation is from previous UpConfig
						kv.removeShardHandler(op)
					default:
						log.Fatalf("invalid command type: %v.", op.OpType)
					}
				}

				//  if necessary snapshot, And beyond its stateSize
				if kv.maxRaftState != -1 && kv.rf.GetRaftStateSize() > kv.maxRaftState {
    
					snapshot := kv.PersistSnapShot()
					kv.rf.Snapshot(msg.CommandIndex, snapshot)
				}

				ch := kv.getWaitCh(msg.CommandIndex)
				ch <- reply
				kv.mu.Unlock()

			}

			if msg.SnapshotValid == true {
    
				if kv.rf.CondInstallSnapshot(msg.SnapshotTerm, msg.SnapshotIndex, msg.Snapshot) {
    
					//  Read snapshot data 
					kv.mu.Lock()
					kv.DecodeSnapShot(msg.Snapshot)
					kv.mu.Unlock()
				}
				continue
			}

		}
	}

}

The additional part is for a group Slice inside ( Add and GC) Consensus of , And the consensus of configuration update .

  • The next step is to detect the latest configuration Loop Of :

The process of configuration update can be seen as the following figure :

 Insert picture description here

  • The slice migration of the whole cluster can be seen as follows :
     Insert picture description here
    After updating the configuration, it is found that there is a configuration update :
  • 1、 Then pass the slice that does not belong to you
  • 2、 Receive other group After slicing AddShards Synchronously update slices in the Group
  • 3、 If you successfully send slices that do not belong to you or if it times out Gc
// ConfigDetectedLoop  Configure detection 
func (kv *ShardKV) ConfigDetectedLoop() {
    
	kv.mu.Lock()

	curConfig := kv.Config
	rf := kv.rf
	kv.mu.Unlock()

	for !kv.killed() {
    
		// only leader needs to deal with configuration tasks
		if _, isLeader := rf.GetState(); !isLeader {
    
			time.Sleep(UpConfigLoopInterval)
			continue
		}
		kv.mu.Lock()

		//  Judge whether the part that doesn't belong to you is given to others 
		if !kv.allSent() {
    
			SeqMap := make(map[int64]int)
			for k, v := range kv.SeqMap {
    
				SeqMap[k] = v
			}
			for shardId, gid := range kv.LastConfig.Shards {
    

				//  Distribute the partition that does not belong to you in the latest configuration to others 
				if gid == kv.gid && kv.Config.Shards[shardId] != kv.gid && kv.shardsPersist[shardId].ConfigNum < kv.Config.Num {
    

					sendDate := kv.cloneShard(kv.Config.Num, kv.shardsPersist[shardId].KvMap)

					args := SendShardArg{
    
						LastAppliedRequestId: SeqMap,
						ShardId:              shardId,
						Shard:                sendDate,
						ClientId:             int64(gid),
						RequestId:            kv.Config.Num,
					}

					// shardId -> gid -> server names
					serversList := kv.Config.Groups[kv.Config.Shards[shardId]]
					servers := make([]*labrpc.ClientEnd, len(serversList))
					for i, name := range serversList {
    
						servers[i] = kv.makeEnd(name)
					}

					//  Start the collaboration to send slices to each client ( What is sent here should be another group , Our own consensus group needs raft Make status changes )
					go func(servers []*labrpc.ClientEnd, args *SendShardArg) {
    

						index := 0
						start := time.Now()
						for {
    
							var reply AddShardReply
							//  Make a consensus within your own group add
							ok := servers[index].Call("ShardKV.AddShard", args, &reply)

							//  If the slice is given successfully , Or time out , Both cases need to be carried out GC Drop the slices that don't belong to you 
							if ok && reply.Err == OK || time.Now().Sub(start) >= 5*time.Second {
    

								//  If it works 
								kv.mu.Lock()
								command := Op{
    
									OpType:   RemoveShardType,
									ClientId: int64(kv.gid),
									SeqId:    kv.Config.Num,
									ShardId:  args.ShardId,
								}
								kv.mu.Unlock()
								kv.startCommand(command, RemoveShardsTimeout)
								break
							}
							index = (index + 1) % len(servers)
							if index == 0 {
    
								time.Sleep(UpConfigLoopInterval)
							}
						}
					}(servers, &args)
				}
			}
			kv.mu.Unlock()
			time.Sleep(UpConfigLoopInterval)
			continue
		}
		if !kv.allReceived() {
    
			kv.mu.Unlock()
			time.Sleep(UpConfigLoopInterval)
			continue
		}

		// current configuration is configured, poll for the next configuration
		curConfig = kv.Config
		sck := kv.sck
		kv.mu.Unlock()

		newConfig := sck.Query(curConfig.Num + 1)
		if newConfig.Num != curConfig.Num+1 {
    
			time.Sleep(UpConfigLoopInterval)
			continue
		}

		command := Op{
    
			OpType:   UpConfigType,
			ClientId: int64(kv.gid),
			SeqId:    newConfig.Num,
			UpConfig: newConfig,
		}
		kv.startCommand(command, UpConfigTimeout)
	}

}
  • tips: There will also be a duplicate removal The problem of , Compared with the client RPC By means of client End of seqId Self increasing , About the self increment of the configuration , Just use the configuration number , As long as the configuration is updated , Then a series of operations will be related to The latest configuration number is related to .

3.3、raft part Rpc

This part is the same as the previous lab The realization of is almost , utilize raft achieve group Slice configuration consensus within the Group .

//------------------------------------------------------RPC part ----------------------------------------------------------

func (kv *ShardKV) Get(args *GetArgs, reply *GetReply) {
    
	shardId := key2shard(args.Key)
	kv.mu.Lock()
	if kv.Config.Shards[shardId] != kv.gid {
    
		reply.Err = ErrWrongGroup
	} else if kv.shardsPersist[shardId].KvMap == nil {
    
		reply.Err = ShardNotArrived
	}
	kv.mu.Unlock()
	if reply.Err == ErrWrongGroup || reply.Err == ShardNotArrived {
    
		return
	}
	command := Op{
    
		OpType:   GetType,
		ClientId: args.ClientId,
		SeqId:    args.RequestId,
		Key:      args.Key,
	}
	err := kv.startCommand(command, GetTimeout)
	if err != OK {
    
		reply.Err = err
		return
	}
	kv.mu.Lock()

	if kv.Config.Shards[shardId] != kv.gid {
    
		reply.Err = ErrWrongGroup
	} else if kv.shardsPersist[shardId].KvMap == nil {
    
		reply.Err = ShardNotArrived
	} else {
    
		reply.Err = OK
		reply.Value = kv.shardsPersist[shardId].KvMap[args.Key]
	}
	kv.mu.Unlock()
	return
}

func (kv *ShardKV) PutAppend(args *PutAppendArgs, reply *PutAppendReply) {
    
	shardId := key2shard(args.Key)
	kv.mu.Lock()
	if kv.Config.Shards[shardId] != kv.gid {
    
		reply.Err = ErrWrongGroup
	} else if kv.shardsPersist[shardId].KvMap == nil {
    
		reply.Err = ShardNotArrived
	}
	kv.mu.Unlock()
	if reply.Err == ErrWrongGroup || reply.Err == ShardNotArrived {
    
		return
	}
	command := Op{
    
		OpType:   args.Op,
		ClientId: args.ClientId,
		SeqId:    args.RequestId,
		Key:      args.Key,
		Value:    args.Value,
	}
	reply.Err = kv.startCommand(command, AppOrPutTimeout)
	return
}

// AddShard move shards from caller to this server
func (kv *ShardKV) AddShard(args *SendShardArg, reply *AddShardReply) {
    
	command := Op{
    
		OpType:   AddShardType,
		ClientId: args.ClientId,
		SeqId:    args.RequestId,
		ShardId:  args.ShardId,
		Shard:    args.Shard,
		SeqMap:   args.LastAppliedRequestId,
	}
	reply.Err = kv.startCommand(command, AddShardsTimeout)
	return
}

3.5、Handler part

Mainly for applyMsgHandlerLoop Medium section 、 Configuration update and other operations handler.

//------------------------------------------------------handler part ------------------------------------------------------

//  Update the latest config Of handler
func (kv *ShardKV) upConfigHandler(op Op) {
    
	curConfig := kv.Config
	upConfig := op.UpConfig
	if curConfig.Num >= upConfig.Num {
    
		return
	}
	for shard, gid := range upConfig.Shards {
    
		if gid == kv.gid && curConfig.Shards[shard] == 0 {
    
			//  If the updated configuration gid With the current configuration gid The same and divided into 0( Not allocated )
			kv.shardsPersist[shard].KvMap = make(map[string]string)
			kv.shardsPersist[shard].ConfigNum = upConfig.Num
		}
	}
	kv.LastConfig = curConfig
	kv.Config = upConfig

}

func (kv *ShardKV) addShardHandler(op Op) {
    
	// this shard is added or it is an outdated command
	if kv.shardsPersist[op.ShardId].KvMap != nil || op.Shard.ConfigNum < kv.Config.Num {
    
		return
	}

	kv.shardsPersist[op.ShardId] = kv.cloneShard(op.Shard.ConfigNum, op.Shard.KvMap)
	
	for clientId, seqId := range op.SeqMap {
    
		if r, ok := kv.SeqMap[clientId]; !ok || r < seqId {
    
			kv.SeqMap[clientId] = seqId
		}
	}
}

func (kv *ShardKV) removeShardHandler(op Op) {
    
	if op.SeqId < kv.Config.Num {
    
		return
	}
	kv.shardsPersist[op.ShardId].KvMap = nil
	kv.shardsPersist[op.ShardId].ConfigNum = op.SeqId
}

3.6、 The snapshot part

This part is the same as before lab To achieve the same , Refer to the previous author lab, I'm not going to go into details here .

// PersistSnapShot Snapshot get snapshot data of kvserver
func (kv *ShardKV) PersistSnapShot() []byte {
    
	w := new(bytes.Buffer)
	e := labgob.NewEncoder(w)
	err := e.Encode(kv.shardsPersist)
	err = e.Encode(kv.SeqMap)
	err = e.Encode(kv.maxRaftState)
	err = e.Encode(kv.Config)
	err = e.Encode(kv.LastConfig)
	if err != nil {
    
		log.Fatalf("[%d-%d] fails to take snapshot.", kv.gid, kv.me)
	}
	return w.Bytes()
}

// DecodeSnapShot install a given snapshot
func (kv *ShardKV) DecodeSnapShot(snapshot []byte) {
    
	if snapshot == nil || len(snapshot) < 1 {
     // bootstrap without any state?
		return
	}
	r := bytes.NewBuffer(snapshot)
	d := labgob.NewDecoder(r)

	var shardsPersist []Shard
	var SeqMap map[int64]int
	var MaxRaftState int
	var Config, LastConfig shardctrler.Config

	if d.Decode(&shardsPersist) != nil || d.Decode(&SeqMap) != nil ||
		d.Decode(&MaxRaftState) != nil || d.Decode(&Config) != nil || d.Decode(&LastConfig) != nil {
    
		log.Fatalf("[Server(%v)] Failed to decode snapshot!!!", kv.me)
	} else {
    
		kv.shardsPersist = shardsPersist
		kv.SeqMap = SeqMap
		kv.maxRaftState = MaxRaftState
		kv.Config = Config
		kv.LastConfig = LastConfig

	}
}

Four 、lab gossip

about Lab Of challenge:

  • In fact, for the whole distributed concurrent process , I think the purpose of keeping the correct results is to try Multiple operations in a group Different times of calls can make this group of operation sequence not due to external factors ( Time and so on ) The impact occurs as expected , It's like revisiting recently Java The art of concurrent programming , Among them happen-before In fact, it creates a fantasy for programmers : The correct synchronization of multithreaded programs is by pressing happens-before Specified but executed in sequence . Even if you compiler 、 How to reorder and optimize the processor trap Time for , The final result is the same as you expected . And combine the whole lab I prefer to call it , Try to keep the whole system process Linearization (Linearization). As long as the linearization is improved , Even if the system seems to be concurrent , But it's actually serial .

For the experiment at this time challenge We just need to define the specific migration process of this slice :

update config -> if all sends -> (no)sendShards ->
Define a linear control process ,challenge That's how it works .

func (kv *ShardKV) allSent() bool {
    
	for shard, gid := range kv.LastConfig.Shards {
    
		//  If the information in the partition in the current configuration does not match , And the configuration number in persistence is smaller , Description has not been sent 
		if gid == kv.gid && kv.Config.Shards[shard] != kv.gid && kv.shardsPersist[shard].ConfigNum < kv.Config.Num {
    
			return false
		}
	}
	return true
}

func (kv *ShardKV) allReceived() bool {
    
	for shard, gid := range kv.LastConfig.Shards {
    

		//  Determine whether the slices have been received 
		if gid != kv.gid && kv.Config.Shards[shard] == kv.gid && kv.shardsPersist[shard].ConfigNum < kv.Config.Num {
    
			return false
		}
	}
	return true
}

summary

Finish this lab Whole 6.824 Of lab It's officially finished , See if there is time behind , Write about the understanding of distributed systems . I have to say a set lab The experiment is still very difficult , And often feel headache , I will admire the design more Lab People who . In fact, for this series Lab In fact, part of my original intention is for my convenience in the follow-up , But recently lab6.824 More and more students ( Drawing is becoming more and more experienced , I also hope my blog can help readers , Of course, these are the author's shallow opinions , For reference only , There will also be some misunderstandings , Welcome to correct ~

  • Test screenshot :
     Insert picture description here
    ( This time test It is related to the experiment 100s Not much worse , and clientd The code is not rewritten , You can also optimize , But recently, there is really no time orz… Busy with all kinds of things . For tests under concurrency unreliable Sometimes fail, There is no time to read this for the time being . I'll solve it later .
    Enclosed gitee For reference only :Mit6.824-2022
原网站

版权声明
本文为[Xingping XP]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/185/202207042301428525.html