Map‑Side Join and Reduce‑Side Join Examples in Hadoop MapReduce (Java)
This article provides two reusable Java code samples that demonstrate how to perform a map‑side join and a reduce‑side join in Hadoop MapReduce, enabling efficient joining of a large dataset with a smaller reference table.
The article shares two reusable Java code snippets that illustrate how to implement map‑side join and reduce‑side join techniques in Hadoop MapReduce, allowing a large table to be joined with a small lookup table efficiently.
Map‑Side Join
package MapJoin;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
/*
* Table1
* 011990-99999 SIHCCAJAVRI
* 012650-99999 TYNSET-HANSMOEN
*
* Table2
* 012650-99999 194903241200 111
* 012650-99999 194903241800 78
* 011990-99999 195005150700 0
* 011990-99999 195005151200 22
* 011990-99999 195005151800 -11
*/
public class MapJoin {
static class mapper extends Mapper<LongWritable, Text, Text, Text> {
private Map<String, String> Table1Map = new HashMap<String, String>();
// Load the small table into memory
protected void setup(Context context) throws IOException {
URI[] paths = context.getCacheFiles();
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
FSDataInputStream fsr = fs.open(new Path(paths[0].toString()));
String line = null;
try {
while ((line = fsr.readLine().toString()) != null) {
String[] vals = line.split("\\t");
if (vals.length == 2) {
Table1Map.put(vals[0], vals[1]);
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
fsr.close();
}
}
// Process the large table
protected void map(LongWritable key, Text val, Context context) throws IOException, InterruptedException {
String[] vals = val.toString().split("\\t");
if (vals.length == 3) {
String Table1Vals = Table1Map.get(vals[0]);
Table1Vals = (Table1Vals == null) ? "" : Table1Vals;
context.write(new Text(vals[0]), new Text(Table1Vals + "\t" + vals[1] + "\t" + vals[2]));
}
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 3) {
System.err.println("Parameter number is wrong, please enter three parameters:<big table hdfs input> <small table local input> <hdfs output>");
System.exit(-1);
}
Job job = new Job(conf, "MapJoin");
job.setJarByClass(MapJoin.class);
job.setMapperClass(mapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
job.addCacheFile((new Path(args[1]).toUri()));
FileOutputFormat.setOutputPath(job, new Path(args[2]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}Reduce‑Side Join
package ReduceJoin;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/*user.csv file:
"ID","NAME","SEX"
"1","user1","0"
"2","user2","0"
"3","user3","0"
"4","user4","1"
"5","user5","0"
"6","user6","0"
"7","user7","1"
"8","user8","0"
"9","user9","0"
order.csv file:
"USER_ID","NAME"
"1","order1"
"2","order2"
"3","order3"
"4","order4"
"7","order7"
"8","order8"
"9","order9"
*/
public class ReduceJoin {
public static class MapClass extends Mapper<LongWritable, Text, Text, Text> {
// Define reusable objects to reduce object creation in map
private Text key = new Text();
private Text value = new Text();
private String[] keyValue = null;
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// Split the CSV line into key and value
keyValue = value.toString().split(",", 2);
this.key.set(keyValue[0]); // set foreign key as MapReduce key
this.value.set(keyValue[1]);
context.write(this.key, this.value);
}
}
public static class Reduce extends Reducer<Text, Text, Text, Text> {
private Text value = new Text();
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
StringBuilder valueStr = new StringBuilder();
// Concatenate all values that share the same key
for (Text val : values) {
valueStr.append(val);
valueStr.append(",");
}
this.value.set(valueStr.deleteCharAt(valueStr.length() - 1).toString());
context.write(key, this.value);
}
}
public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = new Job(conf, "MyJoin");
job.setJarByClass(ReduceJoin.class);
job.setMapperClass(MapClass.class);
job.setReducerClass(Reduce.class);
// job.setCombinerClass(Reduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}The post ends with a friendly reminder that sharing and liking the article is the greatest support for the author.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Big Data Technology & Architecture
Wang Zhiwu, a big data expert, dedicated to sharing big data technology.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
