当前位置:网站首页>6.824 Lab 1: MapReduce
6.824 Lab 1: MapReduce
2022-06-13 00:14:00 【Ethan97】
brief introduction
according to MapReduce Paper Construct a MapReduce System . The system mainly includes master and worker.master Mainly responsible for distribution tasks 、 Handle worker fault ;worker Main responsibility basis map
、reduce
Function to read and write files .
Ideas
- Task distribution :master Put the tasks to be completed into the channel , Give Way worker Take the task from the channel , Complete the corresponding operations according to the task type .
- Fault tolerance :master Track the completion of each task , If a task is not completed after a certain period of time , Then republish the task .
- Completion judgment :master Directly judge whether the target file of the current directory exists to judge whether a task is completed . For example, intermediate files
mr-X
, andreduce
The output file after the operation is completedmr-out-X
;master Before starting, it is necessary to determine whether a file with the same name as the intermediate file exists , If it exists, delete , Avoid erroneous judgment of task completion at runtime . - Program exit :master Check that all tasks are mutually exclusive after completion
done = true
, At this timemrworker
callDone()
Method discovery task complete , Can exit smoothly ; stay master after ,worker stay RPC Unable to contact master You can judge that all the tasks have been completed . - Avoid concurrency errors : utilize
ioutil.TempFile
Create a temporary file with a unique name 、 utilizeos.Rename
Rename a file atomically .
Concrete realization
The following are given rpc.go
,master.go
,worker.go
Three files .
rpc.go
rpc.go
Defined master and worker Data structure of communication :
package mr
//
// RPC definitions.
//
// remember to capitalize all names.
//
import "os"
import "strconv"
type TaskRequest struct {
}
type TaskType int
const (
MapTask = 1
ReduceTask = 2
)
type TaskResponse struct {
// if it is a map task, Filename indicates file that need to be mapped, else it is empty string
Filename string
// task type is either map/reduce
TypeOfTask TaskType
// this is the serial number of task
Serial int
// NReduce is for dividing intermediate result into buckets
NReduce int
}
// Cook up a unique-ish UNIX-domain socket name
// in /var/tmp, for the master.
// Can't use the current directory since
// Athena AFS doesn't support UNIX-domain sockets.
func masterSock() string {
s := "/var/tmp/824-mr-"
s += strconv.Itoa(os.Getuid())
return s
}
master.go
master The implementation is as follows :
package mr
import (
"log"
"net"
"net/http"
"net/rpc"
"strconv"
"time"
)
import "os"
type Master struct {
// user TaskChannel to deliver task to workers
TaskChannel chan TaskResponse
// done will be true if all task is done
done bool
// sem is to protect done from concurrent read/write
sem chan struct{
}
}
// keep track of task
type TaskTrack struct {
taskResp TaskResponse
startTime time.Time
}
func (m *Master) DispatchTask(request *TaskRequest, response *TaskResponse) error {
// extract a task from channel
// if there is no task available, the thread which calls this function will go to sleep
temp := <-m.TaskChannel
response.Filename = temp.Filename
response.TypeOfTask = temp.TypeOfTask
response.Serial = temp.Serial
response.NReduce = temp.NReduce
return nil
}
//
// main/mrmaster.go calls Done() periodically to find out
// if the entire job has finished.
//
func (m *Master) Done() bool {
ret := false
// read m.done exclusively
<- m.sem
ret = m.done
m.sem <- struct{
}{
}
return ret
}
// task expires after ten seconds
func isExpired(task TaskTrack) bool {
return time.Now().Sub(task.startTime).Seconds() > 10
}
func dispatcher(files []string, nReduce int, m *Master) {
// remove intermediate files in case there is any collision
for i := 0; i < len(files); i++ {
filename := "mr-" + strconv.Itoa(i)
err := os.Remove(filename)
if err != nil && !os.IsNotExist(err) {
log.Fatalf("error occurs while removing file %v", filename)
}
}
var unfinishedTasks []TaskTrack
//-------------------------------------------- dispatch map task --------------------------------------------
for i, file := range files {
resp := TaskResponse{
Filename: file, TypeOfTask: MapTask, Serial: i}
m.TaskChannel <- resp
unfinishedTasks = append(unfinishedTasks, TaskTrack{
taskResp: resp, startTime: time.Now()})
}
// check if all map tasks are complete
for len(unfinishedTasks) > 0 {
for i := 0; i < len(unfinishedTasks); i++ {
track := unfinishedTasks[i]
filename := "mr-" + strconv.Itoa(track.taskResp.Serial)
// check if intermediate file exists
if _, err := os.Stat(filename); err == nil {
// filename exists, which indicates that this track is completed
unfinishedTasks = append(unfinishedTasks[:i], unfinishedTasks[i + 1:]...)
i--
} else if len(m.TaskChannel) == 0 && isExpired(track) {
// track dispatch channel is empty && this task is expired, emit this task again
m.TaskChannel <- track.taskResp
// reset startTime of this task
unfinishedTasks[i].startTime = time.Now()
}
}
time.Sleep(time.Second)
}
//-------------------------------------------- dispatch reduce task --------------------------------------------
// all map tasks are completed, now start to emit reduce task
// there are nReduce reduce tasks in total
for i := 0; i < nReduce; i++ {
resp := TaskResponse{
TypeOfTask: ReduceTask, Serial: i, NReduce: nReduce}
m.TaskChannel <- resp
unfinishedTasks = append(unfinishedTasks, TaskTrack{
taskResp: resp, startTime: time.Now()})
}
// check if all reduce tasks are complete
for len(unfinishedTasks) > 0 {
for i := 0; i < len(unfinishedTasks); i++ {
track := unfinishedTasks[i]
filename := "mr-out-" + strconv.Itoa(track.taskResp.Serial)
if _, err := os.Stat(filename); err == nil {
unfinishedTasks = append(unfinishedTasks[:i], unfinishedTasks[i + 1:]...)
i--
} else if len(m.TaskChannel) == 0 && isExpired(track) {
m.TaskChannel <- track.taskResp
// reset startTime
unfinishedTasks[i].startTime = time.Now()
}
}
time.Sleep(time.Second)
}
// exclusively set status to done
<- m.sem
m.done = true
m.sem <- struct{
}{
}
}
//
// create a Master.
// main/mrmaster.go calls this function.
// nReduce is the number of reduce tasks to use.
//
func MakeMaster(files []string, nReduce int) *Master {
// initialize master
m := Master{
TaskChannel: make(chan TaskResponse, 100), sem: make(chan struct{
}, 1)}
m.sem <- struct{
}{
}
// dispatcher tasks in another thread
go dispatcher(files, nReduce, &m)
// start a thread that listens for RPCs from worker.go
m.server()
return &m
}
//
// start a thread that listens for RPCs from worker.go
//
func (m *Master) server() {
rpc.Register(m)
rpc.HandleHTTP()
//l, e := net.Listen("tcp", ":1234")
sockname := masterSock()
os.Remove(sockname)
l, e := net.Listen("unix", sockname)
if e != nil {
log.Fatal("listen error:", e)
}
go http.Serve(l, nil)
}
worker.go
worker The implementation is as follows :
package mr
import (
"encoding/json"
"fmt"
"io/ioutil"
"os"
"sort"
"strconv"
)
import "log"
import "net/rpc"
import "hash/fnv"
//
// Map functions return a slice of KeyValue.
//
type KeyValue struct {
Key string
Value string
}
// for sorting by key.
type ByKey []KeyValue
// for sorting by key.
func (a ByKey) Len() int {
return len(a) }
func (a ByKey) Swap(i, j int) {
a[i], a[j] = a[j], a[i] }
func (a ByKey) Less(i, j int) bool {
return a[i].Key < a[j].Key }
//
// use ihash(key) % NReduce to choose the reduce
// task number for each KeyValue emitted by Map.
//
func ihash(key string) int {
h := fnv.New32a()
_, err := h.Write([]byte(key))
if err != nil {
log.Fatalf("error occurs while hashing key %v", key)
}
return int(h.Sum32() & 0x7fffffff)
}
//
// main/mrworker.go calls this function.
//
func Worker(mapf func(string, string) []KeyValue,
reducef func(string, []string) string) {
// Your worker implementation here.
// worker call rpc towards master Ask for work
for true {
task, ok := askForTask()
// fail to contact master, which indicates that all tasks are done
if !ok {
break
}
if task.TypeOfTask == MapTask {
doMapTask(task, mapf)
} else {
doReduceTask(task, reducef)
}
}
// uncomment to send the Example RPC to the master.
//CallExample()
}
func askForTask() (TaskResponse, bool) {
request := TaskRequest{
}
response := TaskResponse{
}
ok := call("Master.DispatchTask", &request, &response)
return response, ok
}
func doMapTask(task TaskResponse, mapf func(string, string) []KeyValue) {
filename := task.Filename
file, err := os.Open(filename)
if err != nil {
log.Fatalf("cannot open %v", filename)
}
content, err := ioutil.ReadAll(file)
if err != nil {
log.Fatalf("cannot read %v", filename)
}
err = file.Close()
if err != nil {
log.Fatalf("cannot close file %v", file.Name())
}
kva := mapf(filename, string(content))
tempFile, err := ioutil.TempFile("", "")
if err != nil {
log.Fatalf("cannot create tempFile %v", tempFile)
}
enc := json.NewEncoder(tempFile)
for _, kv := range kva {
err := enc.Encode(&kv)
if err != nil {
log.Fatalf("cannot encode kv %v into file %v", kv, tempFile)
}
}
err = tempFile.Close()
if err != nil {
log.Fatalf("error occurs while closing file %v", tempFile)
}
// intermediate kv pairs are saved in mr-X
err = os.Rename(tempFile.Name(), "mr-"+strconv.Itoa(task.Serial))
if err != nil {
log.Fatalf("err occurs while renaming file %v to %s", tempFile, "mr-"+strconv.Itoa(task.Serial))
}
}
func doReduceTask(task TaskResponse, reducef func(string, []string) string) {
var kva []KeyValue
tempFile, err := ioutil.TempFile("", "")
if err != nil {
log.Fatalf("cannot create tempFile %v", tempFile)
}
// go to every intermediate file to collect corresponding keys
i := 0
for {
file, err := os.Open("mr-" + strconv.Itoa(i))
if err != nil {
if os.IsNotExist(err) {
// all intermediate files are read
break
} else {
log.Fatalf("error occurs while openning a file %v", "mr-" + strconv.Itoa(i))
}
}
dec := json.NewDecoder(file)
for {
var kv KeyValue
if err := dec.Decode(&kv); err != nil {
break
}
// select keys that this worker need to reduce
if ihash(kv.Key) % task.NReduce == task.Serial {
kva = append(kva, kv)
}
}
i++
}
sort.Sort(ByKey(kva))
j := 0
for j < len(kva) {
k := j + 1
for k < len(kva) && kva[j].Key == kva[k].Key {
k++
}
var values []string
for u := j; u < k; u++ {
values = append(values, kva[u].Value)
}
output := reducef(kva[j].Key, values)
_, err := fmt.Fprintf(tempFile, "%v %v\n", kva[j].Key, output)
if err != nil {
log.Fatalf("error occurs while wrting into tempFile %v", tempFile)
}
j = k
}
err = tempFile.Close()
if err != nil {
log.Fatalf("error occurs while closing file %v", tempFile)
}
err = os.Rename(tempFile.Name(), "mr-out-"+strconv.Itoa(task.Serial))
if err != nil {
log.Fatalf("error occurs while renaming file %v to %s", tempFile, "mr-out-"+strconv.Itoa(task.Serial))
}
}
//
// send an RPC request to the master, wait for the response.
// usually returns true.
// returns false if something goes wrong.
//
func call(rpcname string, args interface{
}, reply interface{
}) bool {
// c, err := rpc.DialHTTP("tcp", "127.0.0.1"+":1234")
sockname := masterSock()
c, err := rpc.DialHTTP("unix", sockname)
if err != nil {
log.Fatal("dialing:", err)
}
defer c.Close()
err = c.Call(rpcname, args, reply)
if err == nil {
return true
}
fmt.Println(err)
return false
}
边栏推荐
- PMP training organization
- Start of u-boot S analysis (III)
- Is the brokerage account in qiniu business school safe? Is the account opening rate low
- Explanation and practice of implicit transformation and implicit parameters in Scala
- 【Matlab】基础运算
- How to pass the PMP review?
- 如何快速查询手机号码归属地和运营商
- Tsinghua University image source will cause tensorflow GPU installation failure
- 【Matlab】二维曲线
- Real time preview of PHP in browser by vscade
猜你喜欢
How to load 100000 pieces of data in leaflet
Why study PMP?
Matlab [path planning] - UAV drug distribution route optimization
Basics of network security (1)
Leaflet that supports canvas Path. Dashflow dynamic flow direction line
63. different paths II
分公司能与员工签劳动合同么
PMP renewal | PDU specific operation diagram
【HCIE论述】STP-A
What are the PMP scores?
随机推荐
Browser cache execution process
PLC也能制作小游戏----Codesys编写猜数字小游戏
Divicon est toujours utilisé dans le leaflet de l'ère H5?
leaflet中如何通过透明度控制layerGroup的显示隐藏
Start of u-boot S analysis (III)
[LeetCode]3. The longest substring without duplicate characters forty
Explanation and practice of implicit transformation and implicit parameters in Scala
Video tracker error troubleshooting
63. 不同路径 II
Start of u-boot_ Armboot analysis (II)
Tsinghua University image source will cause tensorflow GPU installation failure
Cherry Blossom powder Dudu
2022施工员-设备方向-通用基础(施工员)操作证考试题及模拟考试
【Matlab】二维曲线
[LeetCode]26. Removes duplicates from a sorted array thirty-three
[matlab] basic knowledge
Enterprise wechat H5_ Authentication, H5 application web page authorization login to obtain identity
Start of u-boot_ Armboot analysis (I)
VHDL programming experiment exercises collection
Apispace empty number detection API interface is free and easy to use