Big Data 21 min read

Implementing a Distributed Stepwise Queue with Zookeeper for Hadoop Profit Calculation

This article demonstrates how to use Zookeeper as a distributed stepwise queue to coordinate multiple Hadoop MapReduce jobs for purchase, sales, and other cost calculations, automatically triggering a profit computation once all tasks complete, and provides full Java code examples and deployment instructions.

Architecture Digest
Architecture Digest
Architecture Digest
Implementing a Distributed Stepwise Queue with Zookeeper for Hadoop Profit Calculation

Background: multiple teams need to collaborate on Hadoop data processing, similar to a workflow; Zookeeper is used as a stepwise message middleware to coordinate purchase, sell, and other cost calculations and finally compute profit.

System design: a synchronous queue with three condition nodes (purchase, sell, other) under /queue ; when all are created, a profit node is triggered.

Environment preparation: Hadoop 2.7.3 cluster (at least one pseudo‑distributed node), Zookeeper ensemble of at least three nodes, and a Java development environment.

Purchase calculation code:

package zkqueue;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Pattern;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

/**
 * 采购金额计算
 * @author Jon_China
 */
public class Purchase {
    public static final String HDFS = "hdfs://192.168.8.101:9000";
    public static final Pattern DELIMITER = Pattern.compile("[\t,]");

    public static class PurchaseMapper extends Mapper
{
        private String month = "2017-01";
        private Text k = new Text(month);
        private IntWritable v = new IntWritable();
        private int money = 0;

        public void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException {
            System.out.println(values.toString());
            String[] tokens = DELIMITER.split(values.toString()); //拆分源数据
            if (tokens[3].startsWith(month)) { // 过滤1月份数据
                money = Integer.parseInt(tokens[1]) * Integer.parseInt(tokens[2]); //计算
                v.set(money);
                context.write(k, v);
            }
        }
    }

    public static class PurchaseReducer extends Reducer
{
        private IntWritable v = new IntWritable();
        private int money = 0;

        @Override
        public void reduce(Text key, Iterable
values, Context context) throws IOException, InterruptedException {
            for (IntWritable line : values) {
                money += line.get();
            }
            v.set(money);
            context.write(null, v);
            System.out.println("Output:" + key + "," + money);
        }
    }

    public static void run(Map
path) throws IOException, InterruptedException, ClassNotFoundException {
        JobConf conf = config();
        String local_data = path.get("purchase");
        String input = path.get("input");
        String output = path.get("output");

        HdfsDAO hdfs = new HdfsDAO(HDFS, conf);
        hdfs.rmr(input);
        hdfs.mkdirs(input);
        hdfs.copyFile(local_data, input);

        Job job = Job.getInstance(conf);
        job.setJarByClass(Purchase.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        job.setMapperClass(PurchaseMapper.class);
        job.setReducerClass(PurchaseReducer.class);
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        FileInputFormat.setInputPaths(job, new Path(input));
        FileOutputFormat.setOutputPath(job, new Path(output));
        job.waitForCompletion(true);
    }

    public static JobConf config() {
        JobConf conf = new JobConf(Purchase.class);
        conf.setJobName("purchase");
        conf.addResource("classpath:/hadoop/core-site.xml");
        conf.addResource("classpath:/hadoop/hdfs-site.xml");
        conf.addResource("classpath:/hadoop/mapred-site.xml");
        conf.addResource("classpath:/hadoop/yarn-site.xml");
        return conf;
    }

    public static Map
path(){
        Map
path = new HashMap
();
        path.put("purchase", Purchase.class.getClassLoader().getResource("logfile/biz/purchase.csv").getPath()); // 源文件数据
        path.put("input", HDFS + "/user/hdfs/biz/purchase"); //hdfs存储路径
        path.put("output", HDFS + "/user/hdfs/biz/purchase/output"); //hdfs输出路径
        return path;
    }

    public static void main(String[] args) throws Exception {
        run(path());
    }
}

Sell calculation code:

package zkqueue;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Pattern;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

/**
 * 销售数据计算
 * @author Jon_China
 */
public class Sell {
    public static final String HDFS = "hdfs://192.168.8.101:9000";
    public static final Pattern DELIMITER = Pattern.compile("[\t,]");

    public static class SellMapper extends Mapper
{
        private String month = "2013-01";
        private Text k = new Text(month);
        private IntWritable v = new IntWritable();
        private int money = 0;

        public void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException {
            System.out.println(values.toString());
            String[] tokens = DELIMITER.split(values.toString());
            if (tokens[3].startsWith(month)) { // 1月的数据
                money = Integer.parseInt(tokens[1]) * Integer.parseInt(tokens[2]); //单价*数量
                v.set(money);
                context.write(k, v);
            }
        }
    }

    public static class SellReducer extends Reducer
{
        private IntWritable v = new IntWritable();
        private int money = 0;

        @Override
        public void reduce(Text key, Iterable
values, Context context) throws IOException, InterruptedException {
            for (IntWritable line : values) {
                money += line.get();
            }
            v.set(money);
            context.write(null, v);
            System.out.println("Output:" + key + "," + money);
        }
    }

    public static void run(Map
path) throws IOException, InterruptedException, ClassNotFoundException {
        JobConf conf = config();
        String local_data = path.get("sell");
        String input = path.get("input");
        String output = path.get("output");

        // 初始化sell
        HdfsDAO hdfs = new HdfsDAO(HDFS, conf);
        hdfs.rmr(input);
        hdfs.mkdirs(input);
        hdfs.copyFile(local_data, input);

        Job job = Job.getInstance(conf);
        job.setJarByClass(Sell.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        job.setMapperClass(SellMapper.class);
        job.setReducerClass(SellReducer.class);
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        FileInputFormat.setInputPaths(job, new Path(input));
        FileOutputFormat.setOutputPath(job, new Path(output));
        job.waitForCompletion(true);
    }

    public static JobConf config() { // Hadoop集群的远程配置信息
        JobConf conf = new JobConf(Purchase.class);
        conf.setJobName("purchase");
        conf.addResource("classpath:/hadoop/core-site.xml");
        conf.addResource("classpath:/hadoop/hdfs-site.xml");
        conf.addResource("classpath:/hadoop/mapred-site.xml");
        conf.addResource("classpath:/hadoop/yarn-site.xml");
        return conf;
    }

    public static Map
path(){
        Map
path = new HashMap
();
        path.put("sell", Sell.class.getClassLoader().getResource("logfile/biz/sell.csv").getPath()); // 本地的数据文件
        path.put("input", HDFS + "/user/hdfs/biz/sell"); // HDFS的目录
        path.put("output", HDFS + "/user/hdfs/biz/sell/output"); // 输出目录
        return path;
    }

    public static void main(String[] args) throws Exception {
        run(path());
    }
}

Other cost calculation code:

package zkqueue;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.regex.Pattern;

public class Other {
    public static String file = "/logfile/biz/other.csv";
    public static final Pattern DELIMITER = Pattern.compile("[\t,]");
    private static String month = "2017-01";

    public static void main(String[] args) throws IOException {
        calcOther(file);
    }

    public static int calcOther(String file) throws IOException {
        int money = 0;
        BufferedReader br = new BufferedReader(new FileReader(new File(file)));
        String s = null;
        while ((s = br.readLine()) != null) {
            String[] tokens = DELIMITER.split(s);
            if (tokens[0].startsWith(month)) { // 1月的数据
                money += Integer.parseInt(tokens[1]);
            }
        }
        br.close();
        System.out.println("Output:" + month + "," + money);
        return money;
    }
}

Profit calculation code:

package zkqueue;

import java.io.IOException;

/**
 * 利润计算
 * @author Jon_China
 */
public class Profit {
    public static void main(String[] args) throws Exception {
        profit();
    }

    public static void profit() throws Exception {
        int sell = getSell();
        int purchase = getPurchase();
        int other = getOther();
        int profit = sell - purchase - other;
        System.out.printf("profit = sell - purchase - other = %d - %d - %d = %d\n", sell, purchase, other, profit);
    }

    public static int getPurchase() throws Exception {
        HdfsDAO hdfs = new HdfsDAO(Purchase.HDFS, Purchase.config());
        return Integer.parseInt(hdfs.cat(Purchase.path().get("output") + "/part-r-00000").trim());
    }

    public static int getSell() throws Exception {
        HdfsDAO hdfs = new HdfsDAO(Sell.HDFS, Sell.config());
        return Integer.parseInt(hdfs.cat(Sell.path().get("output") + "/part-r-00000").trim());
    }

    public static int getOther() throws IOException {
        return Other.calcOther(Other.file);
    }
}

Zookeeper task scheduling code (distributed queue):

package zkqueue;

import java.io.IOException;
import java.util.List;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;

/**
 * 分布式队列zookeeper调度
 * @author Jon_China
 */
public class QueueZookeeper {
    //设置队列目录树
    final public static String QUEUE = "/queue";
    final public static String PROFIT = "/queue/profit";
    final public static String PURCHASE = "/queue/purchase";
    final public static String SELL = "/queue/sell";
    final public static String OTHER = "/queue/other";

    public static void main(String[] args) throws Exception {
        if (args.length == 0) {
            System.out.println("Please start a task:");
        } else {
            doAction(Integer.parseInt(args[0]));
        }
    }

    public static void doAction(int client) throws Exception {
        //zookeeper地址
        String host1 = "192.168.8.104:2181";
        String host2 = "192.168.8.105:2181";
        String host3 = "192.168.8.106:2181";
        ZooKeeper zk = null;
        switch (client) { //1,2,3分别将不同任务加入队列
            case 1:
                zk = connection(host1);
                initQueue(zk);
                doPurchase(zk);
                break;
            case 2:
                zk = connection(host2);
                initQueue(zk);
                doSell(zk);
                break;
            case 3:
                zk = connection(host3);
                initQueue(zk);
                doOther(zk);
                break;
        }
    }

    // 创建一个与服务器的连接
    public static ZooKeeper connection(String host) throws IOException {
        ZooKeeper zk = new ZooKeeper(host, 60000, new Watcher() {
            // 监控所有被触发的事件
            public void process(WatchedEvent event) {
                if (event.getType() == Event.EventType.NodeCreated && event.getPath().equals(PROFIT)) {
                    System.out.println("Queue has Completed!!!");
                }
            }
        });
        return zk;
    }

    /**
     * 初始化队列
     */
    public static void initQueue(ZooKeeper zk) throws KeeperException, InterruptedException {
        System.out.println("WATCH => " + PROFIT);
        zk.exists(PROFIT, true);
        if (zk.exists(QUEUE, false) == null) {
            System.out.println("create " + QUEUE);
            zk.create(QUEUE, QUEUE.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        } else {
            System.out.println(QUEUE + " is exist!");
        }
    }

    /**
     * 采购任务
     */
    public static void doPurchase(ZooKeeper zk) throws Exception {
        if (zk.exists(PURCHASE, false) == null) {
            Purchase.run(Purchase.path());
            System.out.println("create " + PURCHASE);
            zk.create(PURCHASE, PURCHASE.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        } else {
            System.out.println(PURCHASE + " is exist!");
        }
        isCompleted(zk);
    }

    /**
     * 销售任务
     */
    public static void doSell(ZooKeeper zk) throws Exception {
        if (zk.exists(SELL, false) == null) {
            Sell.run(Sell.path());
            System.out.println("create " + SELL);
            zk.create(SELL, SELL.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        } else {
            System.out.println(SELL + " is exist!");
        }
        isCompleted(zk);
    }

    /**
     * 其他计算任务
     */
    public static void doOther(ZooKeeper zk) throws Exception {
        if (zk.exists(OTHER, false) == null) {
            Other.calcOther(Other.file);
            System.out.println("create " + OTHER);
            zk.create(OTHER, OTHER.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        } else {
            System.out.println(OTHER + " is exist!");
        }
        isCompleted(zk);
    }

    /**
     * 检测完成情况
     */
    public static void isCompleted(ZooKeeper zk) throws Exception {
        int size = 3;
        List
children = zk.getChildren(QUEUE, true);
        int length = children.size();
        System.out.println("Queue Complete:" + length + "/" + size);
        if (length >= size) {
            System.out.println("create " + PROFIT);
            Profit.profit();
            zk.create(PROFIT, PROFIT.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
            for (String child : children) { // 清空节点
                zk.delete(QUEUE + "/" + child, -1);
            }
        }
    }
}

Running result: after the three condition nodes (purchase, sell, other) are satisfied, the profit program runs automatically and prints the profit for January 2017 as -6693765.

Source code is available at the GitHub repository linked in the original article.

javaZookeeperMapReduceHadoopDistributed Queue
Architecture Digest
Written by

Architecture Digest

Focusing on Java backend development, covering application architecture from top-tier internet companies (high availability, high performance, high stability), big data, machine learning, Java architecture, and other popular fields.

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.