Building a Simple Single-Node MapReduce System: From Theory to Code
This article walks through implementing a lightweight single‑machine MapReduce framework inspired by the original MapReduce paper, covering the abstract Map/Reduce model, task scheduling between master and workers, core Go code for map, reduce, worker, and coordinator, and a brief reflection on its limitations.
1. Introduction
Lab1 implements a minimal single‑node MapReduce system based on the paper MapReduce: Simplified Data Processing on Large Clusters . The implementation assumes a distributed file system (e.g., GFS) for storing intermediate data.
2. Model Abstraction
The MapReduce model defines two user‑provided functions:
Map : reads an input split and emits a list of key/value pairs.
Reduce : receives all values for a given key and aggregates them (e.g., counts occurrences), producing the final result while preventing memory overflow.
3. Execution Process
The system consists of a single coordinator (master) and multiple workers. The workflow proceeds in four phases:
Map task assignment : the coordinator splits the input data into M map tasks. Workers request a map task via RPC.
Map phase : each worker runs the user‑provided Map function, writes the resulting key/value pairs to local intermediate files, and reports the filenames to the coordinator.
Reduce task assignment : after all map tasks finish, workers request reduce tasks.
Reduce phase : workers read the assigned intermediate files, run the Reduce function on each key’s value list, and write the aggregated result to a final output file.
4. Word‑Count Example
The lab provides a set of text files named pg-*.txt, each representing an e‑book. The goal is to count the total occurrences of every word across all books.
4.1 Map Function
func Map(filename string, contents string) []mr.KeyValue {
// Detect non‑letter runes as word separators.
ff := func(r rune) bool { return !unicode.IsLetter(r) }
// Split the file contents into words.
words := strings.FieldsFunc(contents, ff)
kva := []mr.KeyValue{}
for _, w := range words {
kv := mr.KeyValue{Key: w, Value: "1"}
kva = append(kva, kv)
}
return kva
}The function emits a (word, "1") pair for every word encountered.
4.2 Reduce Function
func Reduce(key string, values []string) string {
// The count of occurrences is the length of the values slice.
return strconv.Itoa(len(values))
}The function returns the total count for the given word.
5. Worker Logic
Workers continuously request tasks from the coordinator, execute the assigned function, and synchronize results back.
func Worker(mapf func(string, string) []KeyValue, reducef func(string, []string) string) {
for {
reply := getCallResp() // RPC request to coordinator
switch reply.TaskType {
case Finish:
return
case Free:
time.Sleep(100 * time.Microsecond)
case MapFunc:
results := handleMapFunc(mapf, reply.TaskId, reply.Filenames[0], reply.ReduceNum)
syncMapReduceResult(reply.TaskId, reply.TaskType, reply.Version, results)
case ReduceFunc:
results := handleReduceFunc(reducef, reply.TaskId, reply.Filenames)
syncMapReduceResult(reply.TaskId, reply.TaskType, reply.Version, results)
}
}
}Key points for map handling:
Keys are hashed modulo ReduceNum to determine the target intermediate file.
Intermediate files should be created with ioutil.TempFile (or os.CreateTemp) to avoid name collisions.
Key points for reduce handling:
Workers read the list of intermediate files assigned to the reduce task, aggregate key/value pairs, and write the result to a temporary file.
The coordinator renames the temporary file to the final output name after verification.
MapReduce relies on the underlying distributed file system to guarantee that each reduce result is stored exactly once.
6. Coordinator (Master) Logic
The coordinator is responsible for task dispatch, timeout monitoring, and stage transitions.
6.1 Task Scheduling RPC
func (c *Coordinator) TaskSchedule(request *RpcRequest, reply *RpcReply) error {
c.lock.Lock()
defer c.lock.Unlock()
switch c.stage {
case MapStage:
if task := c.dispatchMapTask(); task != nil {
reply.TaskId = uint(task.Index)
reply.TaskType = MapFunc
reply.Filenames = []string{task.Files}
reply.ReduceNum = c.reduceNum
reply.Version = task.Version
go c.monitorTaskTimeout(task)
} else {
reply.TaskType = Free // no map task available
}
case ReduceStage:
if task := c.dispatchReduceTask(); task != nil {
reply.TaskId = uint(task.Index)
reply.TaskType = ReduceFunc
reply.Filenames = task.Files
reply.ReduceNum = c.reduceNum
reply.Version = task.Version
go c.monitorTaskTimeout(task)
} else {
reply.TaskType = Free // no reduce task available
}
case CoordinatorDone:
reply.TaskType = Finish
}
return nil
}6.2 Timeout Monitoring
func (c *Coordinator) monitorTaskTimeout(task interface{}) {
switch t := task.(type) {
case *mapTask:
select {
case <-time.After(10 * time.Second):
c.lock.Lock()
t.State = Idle
t.Version++ // invalidate stale results
c.lock.Unlock()
case <-t.done:
return
}
case *reduceTask:
select {
case <-time.After(10 * time.Second):
c.lock.Lock()
t.State = Idle
t.Version++
c.lock.Unlock()
case <-t.done:
return
}
}
}6.3 Stage Transition
func (c *Coordinator) updateStage() {
c.cond.L.Lock()
for !c.checkAllMapTaskFinished() {
c.cond.Wait()
}
c.stage = ReduceStage
c.prepareReduceTasks()
for !c.checkAllReduceTaskFinished() {
c.cond.Wait()
}
c.stage = CoordinatorDone
c.lock.Unlock()
}In a production system, additional mechanisms such as checkpointing, persistent state storage, and backup execution would be required. This lab only implements timeout‑based task reassignment.
7. Reflections on the Original Design
The original MapReduce design introduced a single coordinator, which creates several practical limitations:
The coordinator is a single point of failure; a crash requires manual intervention and halts the entire job.
As the number of workers grows, the coordinator becomes a bottleneck for task distribution and state management.
Overall fault tolerance is limited, motivating later systems to adopt consensus protocols (e.g., Paxos, Raft) for replicated coordination.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
