This article contains some summary but no solution.
Debug suggestions from MIT 6.824 Lab:
The Lab uses plugins, and here are some intro about the go runtime plugin
Hint
-
use
os.Getpid()
to get worker id for its unique identifier -
how to decide the number of workers? you can use the number of files
-
how the coordinate can wait for a while, check if the task is finished. If not, reassign it to others?
- A: you can use the time.After … What is the difference with a sleep? I do not think there is a difference.
func (m *Coordinator) waitForTask(task *Task) {
if task.Type != MapTask && task.Type != ReduceTask {
return
}
<-time.After(time.Second * TaskTimeout)
m.mu.Lock()
defer m.mu.Unlock()
if task.Status == Executing {
task.Status = NotStarted
task.WorkerId = -1
// fmt.Println("Task timeout, reset task status: ", *task)
}
}
- With the above, how can we create a separate thread to waitForTask? Use
go
!go c.waitForTask(&task)
- how to read file?
file, err := os.Open(filePath) checkError(err, "Cannot open file %v\n", filePath) content, err := ioutil.ReadAll(file) checkError(err, "Cannot read file %v\n", filePath) file.Close()
- how to write file?
The worker’s map task code will need a way to store intermediate key/value pairs in files in a way that can be correctly read back during reduce tasks. One possibility is to use Go’s encoding/json
package. To write key/value pairs in JSON format to an open file:
```go
enc := json.NewEncoder(file)
for _, kv := ... {
err := enc.Encode(&kv)
```
```go
// and to read such a file back:
dec := json.NewDecoder(file)
for {
var kv KeyValue
if err := dec.Decode(&kv); err != nil {
break
}
kva = append(kva, kv)
}
``` The following is how to write map output. How to write read output should be the same. ```go func writeMapOutput(kva []KeyValue, mapId int) {
// use io buffers to reduce disk I/O, which greatly improves
// performance when running in containers with mounted volumes
prefix := fmt.Sprintf("%v/mr-%v", TempDir, mapId)
files := make([]*os.File, 0, nReduce)
buffers := make([]*bufio.Writer, 0, nReduce)
encoders := make([]*json.Encoder, 0, nReduce)
// create temp files, use pid to uniquely identify this worker
for i := 0; i < nReduce; i++ {
filePath := fmt.Sprintf("%v-%v-%v", prefix, i, os.Getpid())
file, err := os.Create(filePath)
checkError(err, "Cannot create file %v\n", filePath)
buf := bufio.NewWriter(file)
files = append(files, file)
buffers = append(buffers, buf)
encoders = append(encoders, json.NewEncoder(buf))
}
// write map outputs to temp files
for _, kv := range kva {
idx := ihash(kv.Key) % nReduce
err := encoders[idx].Encode(&kv)
checkError(err, "Cannot encode %v to file\n", kv)
}
// flush file buffer to disk
for i, buf := range buffers {
err := buf.Flush()
checkError(err, "Cannot flush buffer for file: %v\n", files[i].Name())
}
// atomically rename temp files to ensure no one observes partial files
for i, file := range files {
file.Close()
newPath := fmt.Sprintf("%v-%v", prefix, i)
err := os.Rename(file.Name(), newPath)
checkError(err, "Cannot rename file %v\n", file.Name())
} } ```
Code
go /codebase/mit-6-824/lab1/mr/rpc.go