Big Data 8 min read

Map‑Side Join and Reduce‑Side Join Examples in Hadoop MapReduce (Java)

This article provides two reusable Java code samples that demonstrate how to perform a map‑side join and a reduce‑side join in Hadoop MapReduce, enabling efficient joining of a large dataset with a smaller reference table.

Big Data Technology & Architecture
Big Data Technology & Architecture
Big Data Technology & Architecture
Map‑Side Join and Reduce‑Side Join Examples in Hadoop MapReduce (Java)

The article shares two reusable Java code snippets that illustrate how to implement map‑side join and reduce‑side join techniques in Hadoop MapReduce, allowing a large table to be joined with a small lookup table efficiently.

Map‑Side Join

package MapJoin;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

/*
 * Table1
 * 011990-99999    SIHCCAJAVRI
 * 012650-99999    TYNSET-HANSMOEN
 *
 * Table2
 * 012650-99999    194903241200    111
 * 012650-99999    194903241800    78
 * 011990-99999    195005150700    0
 * 011990-99999    195005151200    22
 * 011990-99999    195005151800    -11
 */
public class MapJoin {
    static class mapper extends Mapper<LongWritable, Text, Text, Text> {
        private Map<String, String> Table1Map = new HashMap<String, String>();

        // Load the small table into memory
        protected void setup(Context context) throws IOException {
            URI[] paths = context.getCacheFiles();
            Configuration conf = new Configuration();
            FileSystem fs = FileSystem.get(conf);
            FSDataInputStream fsr = fs.open(new Path(paths[0].toString()));
            String line = null;
            try {
                while ((line = fsr.readLine().toString()) != null) {
                    String[] vals = line.split("\\t");
                    if (vals.length == 2) {
                        Table1Map.put(vals[0], vals[1]);
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                fsr.close();
            }
        }

        // Process the large table
        protected void map(LongWritable key, Text val, Context context) throws IOException, InterruptedException {
            String[] vals = val.toString().split("\\t");
            if (vals.length == 3) {
                String Table1Vals = Table1Map.get(vals[0]);
                Table1Vals = (Table1Vals == null) ? "" : Table1Vals;
                context.write(new Text(vals[0]), new Text(Table1Vals + "\t" + vals[1] + "\t" + vals[2]));
            }
        }
    }

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        if (otherArgs.length != 3) {
            System.err.println("Parameter number is wrong, please enter three parameters:<big table hdfs input> <small table local input> <hdfs output>");
            System.exit(-1);
        }
        Job job = new Job(conf, "MapJoin");
        job.setJarByClass(MapJoin.class);
        job.setMapperClass(mapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        job.addCacheFile((new Path(args[1]).toUri()));
        FileOutputFormat.setOutputPath(job, new Path(args[2]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

Reduce‑Side Join

package ReduceJoin;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/*user.csv file:

"ID","NAME","SEX"
"1","user1","0"
"2","user2","0"
"3","user3","0"
"4","user4","1"
"5","user5","0"
"6","user6","0"
"7","user7","1"
"8","user8","0"
"9","user9","0"

order.csv file:

"USER_ID","NAME"
"1","order1"
"2","order2"
"3","order3"
"4","order4"
"7","order7"
"8","order8"
"9","order9"
*/
public class ReduceJoin {

    public static class MapClass extends Mapper<LongWritable, Text, Text, Text> {
        // Define reusable objects to reduce object creation in map
        private Text key = new Text();
        private Text value = new Text();
        private String[] keyValue = null;

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            // Split the CSV line into key and value
            keyValue = value.toString().split(",", 2);
            this.key.set(keyValue[0]); // set foreign key as MapReduce key
            this.value.set(keyValue[1]);
            context.write(this.key, this.value);
        }
    }

    public static class Reduce extends Reducer<Text, Text, Text, Text> {
        private Text value = new Text();

        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            StringBuilder valueStr = new StringBuilder();
            // Concatenate all values that share the same key
            for (Text val : values) {
                valueStr.append(val);
                valueStr.append(",");
            }
            this.value.set(valueStr.deleteCharAt(valueStr.length() - 1).toString());
            context.write(key, this.value);
        }
    }

    public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();
        Job job = new Job(conf, "MyJoin");
        job.setJarByClass(ReduceJoin.class);
        job.setMapperClass(MapClass.class);
        job.setReducerClass(Reduce.class);
        // job.setCombinerClass(Reduce.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

The post ends with a friendly reminder that sharing and liking the article is the greatest support for the author.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

JavaBig DataJOINMapReduceHadoop
Big Data Technology & Architecture
Written by

Big Data Technology & Architecture

Wang Zhiwu, a big data expert, dedicated to sharing big data technology.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.