当前位置:网站首页>6.824 Lab 1: MapReduce

6.824 Lab 1: MapReduce

2022-06-13 00:14:00 Ethan97

brief introduction

according to MapReduce Paper Construct a MapReduce System . The system mainly includes master and worker.master Mainly responsible for distribution tasks 、 Handle worker fault ;worker Main responsibility basis mapreduce Function to read and write files .

Ideas

  • Task distribution :master Put the tasks to be completed into the channel , Give Way worker Take the task from the channel , Complete the corresponding operations according to the task type .
  • Fault tolerance :master Track the completion of each task , If a task is not completed after a certain period of time , Then republish the task .
  • Completion judgment :master Directly judge whether the target file of the current directory exists to judge whether a task is completed . For example, intermediate files mr-X, and reduce The output file after the operation is completed mr-out-X;master Before starting, it is necessary to determine whether a file with the same name as the intermediate file exists , If it exists, delete , Avoid erroneous judgment of task completion at runtime .
  • Program exit :master Check that all tasks are mutually exclusive after completion done = true, At this time mrworker call Done() Method discovery task complete , Can exit smoothly ; stay master after ,worker stay RPC Unable to contact master You can judge that all the tasks have been completed .
  • Avoid concurrency errors : utilize ioutil.TempFile Create a temporary file with a unique name 、 utilize os.Rename Rename a file atomically .

Concrete realization

The following are given rpc.go,master.go,worker.go Three files .

rpc.go

rpc.go Defined master and worker Data structure of communication :

package mr

//
// RPC definitions.
//
// remember to capitalize all names.
//

import "os"
import "strconv"

type TaskRequest struct {
    
}

type TaskType int

const (
	MapTask = 1
	ReduceTask = 2
)

type TaskResponse struct {
    

	// if it is a map task, Filename indicates file that need to be mapped, else it is empty string
	Filename string

	// task type is either map/reduce
	TypeOfTask TaskType

	// this is the serial number of task
	Serial int

	// NReduce is for dividing intermediate result into buckets
	NReduce int

}

// Cook up a unique-ish UNIX-domain socket name
// in /var/tmp, for the master.
// Can't use the current directory since
// Athena AFS doesn't support UNIX-domain sockets.
func masterSock() string {
    
	s := "/var/tmp/824-mr-"
	s += strconv.Itoa(os.Getuid())
	return s
}

master.go

master The implementation is as follows :

package mr

import (
	"log"
	"net"
	"net/http"
	"net/rpc"
	"strconv"
	"time"
)
import "os"

type Master struct {
    
	// user TaskChannel to deliver task to workers
	TaskChannel chan TaskResponse
	// done will be true if all task is done
	done bool
	// sem is to protect done from concurrent read/write
	sem chan struct{
    }
}

// keep track of task
type TaskTrack struct {
    
	taskResp TaskResponse
	startTime time.Time
}

func (m *Master) DispatchTask(request *TaskRequest, response *TaskResponse) error {
    
	// extract a task from channel
	// if there is no task available, the thread which calls this function will go to sleep
	temp := <-m.TaskChannel
	response.Filename = temp.Filename
	response.TypeOfTask = temp.TypeOfTask
	response.Serial = temp.Serial
	response.NReduce = temp.NReduce
	return nil
}

//
// main/mrmaster.go calls Done() periodically to find out
// if the entire job has finished.
//
func (m *Master) Done() bool {
    
	ret := false

	// read m.done exclusively
	<- m.sem
	ret = m.done
	m.sem <- struct{
    }{
    }

	return ret
}


// task expires after ten seconds
func isExpired(task TaskTrack) bool {
    
	return time.Now().Sub(task.startTime).Seconds() > 10
}

func dispatcher(files []string, nReduce int, m *Master) {
    

	// remove intermediate files in case there is any collision
	for i := 0; i < len(files); i++ {
    
		filename := "mr-" + strconv.Itoa(i)
		err := os.Remove(filename)
		if err != nil && !os.IsNotExist(err) {
    
			log.Fatalf("error occurs while removing file %v", filename)
		}
	}

	var unfinishedTasks []TaskTrack
	//-------------------------------------------- dispatch map task --------------------------------------------
	for i, file := range files {
    
		resp := TaskResponse{
    Filename: file, TypeOfTask: MapTask, Serial: i}
		m.TaskChannel <- resp
		unfinishedTasks = append(unfinishedTasks, TaskTrack{
    taskResp: resp, startTime: time.Now()})
	}
	// check if all map tasks are complete
	for len(unfinishedTasks) > 0 {
    
		for i := 0; i < len(unfinishedTasks); i++ {
    
			track := unfinishedTasks[i]
			filename := "mr-" + strconv.Itoa(track.taskResp.Serial)
			// check if intermediate file exists
			if _, err := os.Stat(filename); err == nil {
    
				// filename exists, which indicates that this track is completed
				unfinishedTasks = append(unfinishedTasks[:i], unfinishedTasks[i + 1:]...)
				i--
			} else if len(m.TaskChannel) == 0 && isExpired(track) {
    
				// track dispatch channel is empty && this task is expired, emit this task again
				m.TaskChannel <- track.taskResp
				// reset startTime of this task
				unfinishedTasks[i].startTime = time.Now()
			}
		}
		time.Sleep(time.Second)
	}

	//-------------------------------------------- dispatch reduce task --------------------------------------------
	// all map tasks are completed, now start to emit reduce task
	// there are nReduce reduce tasks in total
	for i := 0; i < nReduce; i++ {
    
		resp := TaskResponse{
    TypeOfTask: ReduceTask, Serial: i, NReduce: nReduce}
		m.TaskChannel <- resp
		unfinishedTasks = append(unfinishedTasks, TaskTrack{
    taskResp: resp, startTime: time.Now()})
	}

	// check if all reduce tasks are complete
	for len(unfinishedTasks) > 0 {
    
		for i := 0; i < len(unfinishedTasks); i++ {
    
			track := unfinishedTasks[i]
			filename := "mr-out-" + strconv.Itoa(track.taskResp.Serial)
			if _, err := os.Stat(filename); err == nil {
    
				unfinishedTasks = append(unfinishedTasks[:i], unfinishedTasks[i + 1:]...)
				i--
			} else if len(m.TaskChannel) == 0 && isExpired(track) {
    
				m.TaskChannel <- track.taskResp
				// reset startTime
				unfinishedTasks[i].startTime = time.Now()
			}
		}
		time.Sleep(time.Second)
	}

	// exclusively set status to done
	<- m.sem
	m.done = true
	m.sem <- struct{
    }{
    }
}


//
// create a Master.
// main/mrmaster.go calls this function.
// nReduce is the number of reduce tasks to use.
//
func MakeMaster(files []string, nReduce int) *Master {
    

	// initialize master
	m := Master{
    TaskChannel: make(chan TaskResponse, 100), sem: make(chan struct{
    }, 1)}
	m.sem <- struct{
    }{
    }

	// dispatcher tasks in another thread
	go dispatcher(files, nReduce, &m)

	// start a thread that listens for RPCs from worker.go
	m.server()
	return &m
}

//
// start a thread that listens for RPCs from worker.go
//
func (m *Master) server() {
    
	rpc.Register(m)
	rpc.HandleHTTP()
	//l, e := net.Listen("tcp", ":1234")
	sockname := masterSock()
	os.Remove(sockname)
	l, e := net.Listen("unix", sockname)
	if e != nil {
    
		log.Fatal("listen error:", e)
	}
	go http.Serve(l, nil)
}

worker.go

worker The implementation is as follows :

package mr

import (
	"encoding/json"
	"fmt"
	"io/ioutil"
	"os"
	"sort"
	"strconv"
)
import "log"
import "net/rpc"
import "hash/fnv"


//
// Map functions return a slice of KeyValue.
//
type KeyValue struct {
    
	Key   string
	Value string
}

// for sorting by key.
type ByKey []KeyValue

// for sorting by key.
func (a ByKey) Len() int           {
     return len(a) }
func (a ByKey) Swap(i, j int)      {
     a[i], a[j] = a[j], a[i] }
func (a ByKey) Less(i, j int) bool {
     return a[i].Key < a[j].Key }

//
// use ihash(key) % NReduce to choose the reduce
// task number for each KeyValue emitted by Map.
//
func ihash(key string) int {
    
	h := fnv.New32a()
	_, err := h.Write([]byte(key))
	if err != nil {
    
		log.Fatalf("error occurs while hashing key %v", key)
	}
	return int(h.Sum32() & 0x7fffffff)
}


//
// main/mrworker.go calls this function.
//
func Worker(mapf func(string, string) []KeyValue,
	reducef func(string, []string) string) {
    
	// Your worker implementation here.
	// worker call rpc towards master Ask for work 
	for true {
    
		task, ok := askForTask()
		// fail to contact master, which indicates that all tasks are done
		if !ok {
    
			break
		}

		if task.TypeOfTask == MapTask {
    
			doMapTask(task, mapf)
		} else {
    
			doReduceTask(task, reducef)
		}
	}

	// uncomment to send the Example RPC to the master.
	//CallExample()
}

func askForTask() (TaskResponse, bool) {
    
	request := TaskRequest{
    }
	response := TaskResponse{
    }
	ok := call("Master.DispatchTask", &request, &response)
	return response, ok
}


func doMapTask(task TaskResponse, mapf func(string, string) []KeyValue) {
    
	filename := task.Filename
	file, err := os.Open(filename)
	if err != nil {
    
		log.Fatalf("cannot open %v", filename)
	}
	content, err := ioutil.ReadAll(file)
	if err != nil {
    
		log.Fatalf("cannot read %v", filename)
	}
	err = file.Close()
	if err != nil {
    
		log.Fatalf("cannot close file %v", file.Name())
	}
	kva := mapf(filename, string(content))
	tempFile, err := ioutil.TempFile("", "")
	if err != nil {
    
		log.Fatalf("cannot create tempFile %v", tempFile)
	}
	enc := json.NewEncoder(tempFile)
	for _, kv := range kva {
    
		err := enc.Encode(&kv)
		if err != nil {
    
			log.Fatalf("cannot encode kv %v into file %v", kv, tempFile)
		}
	}
	err = tempFile.Close()
	if err != nil {
    
		log.Fatalf("error occurs while closing file %v", tempFile)
	}
	// intermediate kv pairs are saved in mr-X
	err = os.Rename(tempFile.Name(), "mr-"+strconv.Itoa(task.Serial))
	if err != nil {
    
		log.Fatalf("err occurs while renaming file %v to %s", tempFile, "mr-"+strconv.Itoa(task.Serial))
	}
}

func doReduceTask(task TaskResponse, reducef func(string, []string) string) {
    
	var kva []KeyValue
	tempFile, err := ioutil.TempFile("", "")
	if err != nil {
    
		log.Fatalf("cannot create tempFile %v", tempFile)
	}
	// go to every intermediate file to collect corresponding keys
	i := 0
	for {
    
		file, err := os.Open("mr-" + strconv.Itoa(i))
		if err != nil {
    
			if os.IsNotExist(err) {
    
				// all intermediate files are read
				break
			} else {
    
				log.Fatalf("error occurs while openning a file %v", "mr-" + strconv.Itoa(i))
			}
		}
		dec := json.NewDecoder(file)
		for {
    
			var kv KeyValue
			if err := dec.Decode(&kv); err != nil {
    
				break
			}
			// select keys that this worker need to reduce
			if ihash(kv.Key) % task.NReduce == task.Serial {
    
				kva = append(kva, kv)
			}
		}
		i++
	}
	sort.Sort(ByKey(kva))
	j := 0
	for j < len(kva) {
    
		k := j + 1
		for k < len(kva) && kva[j].Key == kva[k].Key {
    
			k++
		}
		var values []string
		for u := j; u < k; u++ {
    
			values = append(values, kva[u].Value)
		}
		output := reducef(kva[j].Key, values)
		_, err := fmt.Fprintf(tempFile, "%v %v\n", kva[j].Key, output)
		if err != nil {
    
			log.Fatalf("error occurs while wrting into tempFile %v", tempFile)
		}
		j = k
	}
	err = tempFile.Close()
	if err != nil {
    
		log.Fatalf("error occurs while closing file %v", tempFile)
	}
	err = os.Rename(tempFile.Name(), "mr-out-"+strconv.Itoa(task.Serial))
	if err != nil {
    
		log.Fatalf("error occurs while renaming file %v to %s", tempFile, "mr-out-"+strconv.Itoa(task.Serial))
	}
}


//
// send an RPC request to the master, wait for the response.
// usually returns true.
// returns false if something goes wrong.
//
func call(rpcname string, args interface{
    }, reply interface{
    }) bool {
    
	// c, err := rpc.DialHTTP("tcp", "127.0.0.1"+":1234")
	sockname := masterSock()
	c, err := rpc.DialHTTP("unix", sockname)
	if err != nil {
    
		log.Fatal("dialing:", err)
	}
	defer c.Close()

	err = c.Call(rpcname, args, reply)
	if err == nil {
    
		return true
	}

	fmt.Println(err)
	return false
}

原网站

版权声明
本文为[Ethan97]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/02/202202280602093592.html