Implementing a Basic Hadoop MapReduce Word Count with Extensible Design and Performance Tuning
This article explains Hadoop’s core concepts using a library analogy, details HDFS storage and MapReduce processing, provides complete Java implementations for a word‑count job with support for text, CSV, and JSON inputs, and discusses extensibility and performance optimizations such as combiners and custom partitioners.
Introduction
"With the arrival of the mobile Internet era, the "big data era" also arrives. Massive streams of information and data flow through various systems and devices, making efficient storage and processing of this massive data a major challenge. Apache's distributed storage and computing framework, Hadoop, has long held an important position in big data processing due to its strong scalability and reliability."
This article borrows Hadoop’s design ideas and uses Java to implement one of its core functions – the MapReduce distributed computing model – to demonstrate how parallel computation can solve massive data processing problems.
1. Understanding Hadoop
Imagine you are the director of a huge library containing thousands of books. Hadoop acts as a powerful management system that helps you store, manage, and process the information of these books.
Two key problems arise:
How to store a massive number of books (equivalent to massive data).
How to quickly find and process the information of these books (equivalent to data computation and analysis).
To achieve these goals, Hadoop introduces HDFS and MapReduce, which handle storage and processing respectively.
HDFS: The Library’s Shelves and Warehouse System
HDFS (Hadoop Distributed File System) is responsible for data storage, similar to the library’s shelves and warehouse that store all books.
Its storage characteristics include:
Distributed storage: Shelves are spread across multiple rooms (nodes), each storing a portion of the books. HDFS splits files into blocks and stores them on different nodes.
Block and fragment storage: Large books are divided into parts and stored in different rooms, enabling parallel reads and reducing storage pressure on a single node.
Redundant backup and fault tolerance: Important books are replicated across rooms so that loss of a room does not cause data loss.
Data managers – NameNode and DataNode: 1. NameNode acts as the library director, managing the catalog and location metadata of all books. 2. DataNode works like a room manager, actually storing the book fragments.
MapReduce: The Library’s Task Allocation System
Beyond storage, the library needs to query, count, and analyze books. MapReduce splits tasks into multiple steps and distributes them to different administrators (nodes) for parallel execution.
Map Phase (Mapping)
Assume you want to know how many times each book has been borrowed. Instead of a single administrator handling all books, the task is divided among administrators of each room. Each administrator counts borrowings in its own room and emits intermediate key‑value pairs. In Hadoop, the Map phase processes data blocks in parallel and outputs intermediate key‑value pairs.
Reduce Phase (Reducing)
After the room administrators submit their intermediate results, the director (NameNode) aggregates them to obtain the overall borrowing statistics. In Hadoop, the Reduce phase receives the intermediate key‑value pairs from all Map tasks, aggregates them, and produces the final output.
Parallelism and Fault Tolerance
Each room administrator can work simultaneously. If an administrator is absent, the director can assign a temporary administrator. MapReduce’s main advantage is its parallel processing capability and automatic task re‑assignment when a node fails, ensuring job completion.
2. Technical Implementation
After installing Hadoop locally, it can run in local mode or pseudo‑distributed mode. For quick development and testing, local mode runs within a single JVM without HDFS, YARN, or MapReduce dependencies.
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.3.6</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>3.3.6</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-app</artifactId>
<version>3.3.6</version>
</dependency>
</dependencies>Implementing the MapReduce Job
Mapper (WordCountMapper)
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class WordCountMapper extends Mapper
{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] words = value.toString().split("\s+");
for (String w : words) {
word.set(w);
context.write(word, one);
}
}
}Reducer (WordCountReducer)
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class WordCountReducer extends Reducer
{
private IntWritable result = new IntWritable();
@Override
protected void reduce(Text key, Iterable
values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}Job Driver (WordCountJob)
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCountJob {
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage: WordCountJob
");
System.exit(-1);
}
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "Word Count");
job.setJarByClass(WordCountJob.class);
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}Running the job on a sample text file produces a word‑frequency list as expected.
Supporting More Input Formats
To handle CSV and JSON files, additional dependencies are added:
org.apache.commons
commons-csv
1.9.0
com.fasterxml.jackson.core
jackson-databind
2.14.0Corresponding mappers (CSVWordCountMapper and JSONWordCountMapper) parse the respective formats and emit word counts.
public class CSVWordCountMapper extends Mapper
{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
CSVParser parser = CSVFormat.DEFAULT.parse(new StringReader(line));
for (CSVRecord record : parser) {
for (String field : record) {
word.set(field.trim());
context.write(word, one);
}
}
}
} public class JSONWordCountMapper extends Mapper
{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
private ObjectMapper objectMapper = new ObjectMapper();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String jsonString = value.toString();
JsonNode jsonNode = objectMapper.readTree(jsonString);
String text = jsonNode.get("text").asText();
String[] words = text.split("\s+");
for (String w : words) {
word.set(w.trim());
context.write(word, one);
}
}
}A simple factory selects the appropriate mapper based on a command‑line format argument.
public class MapperFactory {
public static Class
getMapperClass(String format) {
switch (format.toLowerCase()) {
case "csv":
return CSVWordCountMapper.class;
case "json":
return JSONWordCountMapper.class;
default:
return WordCountMapper.class; // default text format
}
}
}The driver is updated to accept a third argument (format) and set the mapper dynamically.
if (args.length != 3) {
System.err.println("Usage: WordCountJob
");
System.exit(-1);
}
String format = args[2];
job.setMapperClass(MapperFactory.getMapperClass(format));Performance Tuning
Beyond functional correctness, the job can be optimized by reducing intermediate data transfer and handling data skew.
Combiner to Reduce Data Transfer
Setting the reducer class as a combiner aggregates counts locally before shuffling:
job.setCombinerClass(WordCountReducer.class);Custom Partitioner to Mitigate Data Skew
A custom partitioner distributes keys based on the first character, balancing load across reducers:
public class CustomPartitioner extends Partitioner
{
@Override
public int getPartition(Text key, IntWritable value, int numReduceTasks) {
char firstChar = key.toString().toLowerCase().charAt(0);
return (firstChar >= 'a' && firstChar <= 'm') ? 0 : 1;
}
}The driver registers the custom partitioner:
job.setPartitionerClass(CustomPartitioner.class);Conclusion
This article demonstrated how to build a basic Hadoop MapReduce word‑count job, extend it to handle multiple input formats, and apply performance optimizations such as combiners and custom partitioners. While Hadoop remains a vital tool in the big‑data ecosystem, real‑world scenarios often require further tuning and integration with other components.
Rare Earth Juejin Tech Community
Juejin, a tech community that helps developers grow.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.