Skip to content

Latest commit

 

History

History
558 lines (499 loc) · 10.7 KB

lab1_core.md

File metadata and controls

558 lines (499 loc) · 10.7 KB

因为官方不让上传该实验的代码故把核心代码写在markdown里面保存下:

//master
package mr

import (
	"fmt"
	"log"
	"net"
	"net/http"
	"net/rpc"
	"os"
	"regexp"
	"sync"
	"time"
)

// Task Type
type TaskType int

const (
	Type_Map TaskType = iota
	Type_Reduce
)

type TaskState int

const (
	Waiting TaskState = iota
	Running
	finished
)

type GlobalState int

const (
	Global_Map GlobalState = iota
	Global_Reduce
	Global_Done
)

type Task struct {
	TID       int
	Type      TaskType
	State     TaskState
	Input     []string
	ReduceTh  int // only for reduce
	RSzie     int
	StartTime time.Time
}

type Master struct {
	// Your definitions here.
	State      GlobalState
	TaskCnt    int
	MapTask    chan *Task
	ReduceTask chan *Task
	TaskMap    map[int]*Task
	MapSize    int
	ReduceSize int
}

// sync lock
var mu sync.Mutex

// Your code here -- RPC handlers for the worker to call.
func (m *Master) AssginTask(args *TaskArgs, reply *TaskReply) error {
	mu.Lock()
	defer mu.Unlock()
	fmt.Println("AssginTask ......")
	//core to switch m/r/f and process workers
	switch m.State {
	case Global_Map:
		fmt.Println("map..........")
		if len(m.MapTask) > 0 {
			task := <-m.MapTask
			task.State = Running
			task.StartTime = time.Now()
			m.TaskMap[task.TID] = task
			reply.CanDo = Do
			reply.Task = *task
		} else {
			reply.CanDo = NoWork
			//check can switch to next phase
			if m.checkMap() {
				m.nextPhase()
			}
		}
	case Global_Reduce:
		fmt.Println("Reduce..........")
		if len(m.ReduceTask) > 0 {
			task := <-m.ReduceTask
			task.State = Running
			task.StartTime = time.Now()
			m.TaskMap[task.TID] = task
			reply.CanDo = Do
			reply.Task = *task
		} else {
			reply.CanDo = NoWork
			//check can switch to next phase
			if m.checkReduce() {
				m.nextPhase()
			}
		}
	case Global_Done:
		fmt.Println("Done..........")
		reply.CanDo = Exit
	default:
		panic("The phase undefined!")
	}

	return nil
}

func (m *Master) ReciveHeart(args *WorkStateArgs, reply *WorkStateReply) error {
	if args.WorkHeart == WorkDone {
		if m.TaskMap[args.TID] != nil {
			m.TaskMap[args.TID].State = finished
		}
	}
	return nil
}
func (m *Master) checkMap() bool {
	ans := true
	for i := 0; i < m.TaskCnt; i++ {
		if m.TaskMap[i] == nil {
			ans = false
			break
		}
		if m.TaskMap[i].State != finished {
			ans = false
			break
		}
	}
	fmt.Printf("this map ans: %v\n", ans)
	return ans
}

func (m *Master) checkReduce() bool {
	ans := true
	for i := m.MapSize; i < m.TaskCnt; i++ {
		if m.TaskMap[i] == nil {
			ans = false
			break
		}
		if m.TaskMap[i].State != finished {
			ans = false
			break
		}
	}
	fmt.Printf("this reduce ans: %v\n", ans)
	return ans
}

func (m *Master) nextPhase() {
	fmt.Printf("finished this phase : %v\n", m.State)
	if m.State == Global_Map {
		m.initReduce()
		m.State = Global_Reduce
	} else if m.State == Global_Reduce {
		m.State = Global_Done
	}
}

// an example RPC handler.
//
// the RPC argument and reply types are defined in rpc.go.
func (m *Master) Example(args *ExampleArgs, reply *ExampleReply) error {
	reply.Y = args.X + 1
	return nil
}

// start a thread that listens for RPCs from worker.go
func (m *Master) server() {
	rpc.Register(m)
	rpc.HandleHTTP()
	//l, e := net.Listen("tcp", ":1234")
	sockname := masterSock()
	os.Remove(sockname)
	l, e := net.Listen("unix", sockname)
	if e != nil {
		log.Fatal("listen error:", e)
	}
	go http.Serve(l, nil)
}

// main/mrmaster.go calls Done() periodically to find out
// if the entire job has finished.
func (m *Master) Done() bool {
	mu.Lock()
	defer mu.Unlock()
	ret := m.State == Global_Done
	return ret
}

// create TID
func (m *Master) CreateTID() int {
	res := m.TaskCnt
	m.TaskCnt++
	return res
}

func (m *Master) initMap(files []string) {
	fmt.Println("map task init.......")
	for _, file := range files {
		mapTask := Task{
			TID:   m.CreateTID(),
			State: Waiting,
			Type:  Type_Map,
			Input: []string{file},
			RSzie: m.ReduceSize,
		}
		fmt.Printf("make a map task: %v\n", mapTask.TID)
		m.MapTask <- &mapTask
	}
}

func (m *Master) initReduce() {
	fmt.Println("reduce task init.......")
	dir, _ := os.Getwd()
	files, err := os.ReadDir(dir)
	if err != nil {
		fmt.Println(err)
	}
	for i := 0; i < m.ReduceSize; i++ {
		// mr-X-Y
		input := []string{}
		for _, file := range files {
			pattern := fmt.Sprintf("^mr-.+-%v$", i)
			matched, _ := regexp.MatchString(pattern, file.Name())
			if matched {
				input = append(input, file.Name())
			}
		}
		fmt.Println(input)
		reduceTask := Task{
			TID:      m.CreateTID(),
			Type:     Type_Reduce,
			State:    Waiting,
			Input:    input,
			ReduceTh: i,
			RSzie:    m.ReduceSize,
		}
		fmt.Printf("make a reduce task: %v\n", reduceTask.ReduceTh)
		m.ReduceTask <- &reduceTask
	}
}

// create a Master.
// main/mrmaster.go calls this function.
// nReduce is the number of reduce tasks to use.
func MakeMaster(files []string, nReduce int) *Master {
	m := Master{
		State:      Global_Map,
		TaskCnt:    0,
		MapTask:    make(chan *Task, len(files)),
		ReduceTask: make(chan *Task, nReduce),
		TaskMap:    make(map[int]*Task, len(files)+nReduce),
		ReduceSize: nReduce,
	}

	// Your code here.
	m.initMap(files)
	m.server()
	m.CrashHandle()
	return &m
}

func (m *Master) CrashHandle() {
	for {
		time.Sleep(time.Second * 2)
		mu.Lock()
		if m.State == Global_Done {
			mu.Unlock()
			break
		}
		for _, task := range m.TaskMap {
			if task.State == Running && time.Since(task.StartTime) >= 10*time.Second {
				fmt.Printf("is crash : %v\n", task.TID)
				task.State = Waiting
				if task.Type == Type_Map {
					m.MapTask <- task
				} else {
					m.ReduceTask <- task
				}
				delete(m.TaskMap, task.TID)
			}
		}
		mu.Unlock()
	}
}
//woker
package mr

import (
	"encoding/json"
	"fmt"
	"hash/fnv"
	"io"
	"log"
	"net/rpc"
	"os"
	"sort"
	"time"
)

// Map functions return a slice of KeyValue.
type KeyValue struct {
	Key   string
	Value string
}
type ByKey []KeyValue

func (a ByKey) Len() int           { return len(a) }
func (a ByKey) Swap(i, j int)      { a[i], a[j] = a[j], a[i] }
func (a ByKey) Less(i, j int) bool { return a[i].Key < a[j].Key }

// use ihash(key) % NReduce to choose the reduce
// task number for each KeyValue emitted by Map.
func ihash(key string) int {
	h := fnv.New32a()
	h.Write([]byte(key))
	return int(h.Sum32() & 0x7fffffff)
}

// main/mrworker.go calls this function.
func Worker(mapf func(string, string) []KeyValue,
	reducef func(string, []string) string) {
	flag := true
	for flag {
		args := TaskArgs{}
		reply := TaskReply{}
		call("Master.AssginTask", &args, &reply)
		switch reply.CanDo {
		case Exit:
			flag = false
		case NoWork:
			fmt.Println("NoWork.....")
			time.Sleep(time.Second)
		case Do:
			task := &reply.Task
			fmt.Println(reply)
			if task.Type == Type_Map {
				doMapTask(mapf, task)
				CallFinined(task.TID)
			} else if task.Type == Type_Reduce {
				doReduceTask(reducef, task)
				CallFinined(task.TID)
			}
		default:
			panic("bad args!")
		}
	}
}

func doMapTask(mapf func(string, string) []KeyValue, task *Task) {
	intermediate := []KeyValue{}
	for _, filename := range task.Input {
		file, err := os.Open(filename)
		if err != nil {
			log.Fatalf("cannot open %v", filename)
		}
		context, err := io.ReadAll(file)
		if err != nil {
			log.Fatalf("cannot read %v", filename)
		}
		file.Close()
		kva := mapf(filename, string(context))
		intermediate = append(intermediate, kva...)
	}

	//write into tmp file mr-x-y
	for i := 0; i < task.RSzie; i++ {
		filename := fmt.Sprintf("mr-%v-%v", task.TID, i)
		tmpfile, err := os.Create(filename)
		if err != nil {
			log.Fatal(err)
		}
		fmt.Println(filename)
		encoder := json.NewEncoder(tmpfile)
		for _, kv := range intermediate {
			if ihash(kv.Key)%task.RSzie == i {
				encoder.Encode(kv)
			}
		}
		tmpfile.Close()
	}
}

func doReduceTask(reducef func(string, []string) string, task *Task) {
	fmt.Println("this is reduce")
	intermediate := shuffe(task.Input)
	dir, _ := os.Getwd()
	tmpfile, err := os.CreateTemp(dir, "mr-out-tmpfile-")
	if err != nil {
		log.Fatal(err)
	}
	i := 0
	for i < len(intermediate) {
		j := i + 1
		for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key {
			j++
		}
		values := []string{}
		for k := i; k < j; k++ {
			values = append(values, intermediate[k].Value)
		}
		output := reducef(intermediate[i].Key, values)

		// this is the correct format for each line of Reduce output.
		fmt.Fprintf(tmpfile, "%v %v\n", intermediate[i].Key, output)
		i = j
	}
	oname := fmt.Sprintf("mr-out-%v", task.ReduceTh)
	tmpfile.Close()
	os.Rename(tmpfile.Name(), oname)
}

func shuffe(files []string) []KeyValue {
	kva := []KeyValue{}
	for _, file := range files {
		tmpfile, err := os.Open(file)
		if err != nil {
			log.Fatal(err)
		}
		decoder := json.NewDecoder(tmpfile)
		for {
			var kv KeyValue
			err := decoder.Decode(&kv)
			if err != nil { //eof
				break
			}
			kva = append(kva, kv)
		}
		tmpfile.Close()
	}

	sort.Sort(ByKey(kva))
	return kva
}
func CallFinined(TID int) {
	args := WorkStateArgs{WorkDone, TID}
	reply := WorkStateReply{}
	call("Master.ReciveHeart", &args, &reply)
}

// example function to show how to make an RPC call to the master.
//
// the RPC argument and reply types are defined in rpc.go.
func CallExample() {

	// declare an argument structure.
	args := ExampleArgs{}

	// fill in the argument(s).
	args.X = 99

	// declare a reply structure.
	reply := ExampleReply{}

	// send the RPC request, wait for the reply.
	call("Master.Example", &args, &reply)

	// reply.Y should be 100.
	fmt.Printf("reply.Y %v\n", reply.Y)
}

// send an RPC request to the master, wait for the response.
// usually returns true.
// returns false if something goes wrong.
func call(rpcname string, args interface{}, reply interface{}) bool {
	// c, err := rpc.DialHTTP("tcp", "127.0.0.1"+":1234")
	sockname := masterSock()
	c, err := rpc.DialHTTP("unix", sockname)
	if err != nil {
		log.Fatal("dialing:", err)
	}
	defer c.Close()

	err = c.Call(rpcname, args, reply)
	if err == nil {
		return true
	}

	fmt.Println(err)
	return false
}
//rpc
package mr

//
// RPC definitions.
//
// remember to capitalize all names.
//

import "os"
import "strconv"

//
// example to show how to declare the arguments
// and reply for an RPC.
//
type CallState int 

const (
	Exit CallState = iota 
	NoWork 
	Do
)

type WorkState int 

const (
	Working WorkState = iota
	WorkDone
)
type ExampleArgs struct {
	X int
}

type ExampleReply struct {
	Y int
}

// Add your RPC definitions here.
type TaskArgs struct {
}

type TaskReply struct {
	CanDo CallState
	Task Task
}

type WorkStateArgs struct {
	WorkHeart WorkState
	TID int 
}

type WorkStateReply struct {}
// Cook up a unique-ish UNIX-domain socket name
// in /var/tmp, for the master.
// Can't use the current directory since
// Athena AFS doesn't support UNIX-domain sockets.
func masterSock() string {
	s := "/var/tmp/824-mr-"
	s += strconv.Itoa(os.Getuid())
	return s
}