Implementing a Distributed Stepwise Queue with Zookeeper for Hadoop Profit Calculation
This article demonstrates how to use Zookeeper as a distributed stepwise queue to coordinate multiple Hadoop MapReduce jobs for purchase, sales, and other cost calculations, automatically triggering a profit computation once all tasks complete, and provides full Java code examples and deployment instructions.
Background: multiple teams need to collaborate on Hadoop data processing, similar to a workflow; Zookeeper is used as a stepwise message middleware to coordinate purchase, sell, and other cost calculations and finally compute profit.
System design: a synchronous queue with three condition nodes (purchase, sell, other) under /queue ; when all are created, a profit node is triggered.
Environment preparation: Hadoop 2.7.3 cluster (at least one pseudo‑distributed node), Zookeeper ensemble of at least three nodes, and a Java development environment.
Purchase calculation code:
package zkqueue;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Pattern;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
/**
* 采购金额计算
* @author Jon_China
*/
public class Purchase {
public static final String HDFS = "hdfs://192.168.8.101:9000";
public static final Pattern DELIMITER = Pattern.compile("[\t,]");
public static class PurchaseMapper extends Mapper
{
private String month = "2017-01";
private Text k = new Text(month);
private IntWritable v = new IntWritable();
private int money = 0;
public void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException {
System.out.println(values.toString());
String[] tokens = DELIMITER.split(values.toString()); //拆分源数据
if (tokens[3].startsWith(month)) { // 过滤1月份数据
money = Integer.parseInt(tokens[1]) * Integer.parseInt(tokens[2]); //计算
v.set(money);
context.write(k, v);
}
}
}
public static class PurchaseReducer extends Reducer
{
private IntWritable v = new IntWritable();
private int money = 0;
@Override
public void reduce(Text key, Iterable
values, Context context) throws IOException, InterruptedException {
for (IntWritable line : values) {
money += line.get();
}
v.set(money);
context.write(null, v);
System.out.println("Output:" + key + "," + money);
}
}
public static void run(Map
path) throws IOException, InterruptedException, ClassNotFoundException {
JobConf conf = config();
String local_data = path.get("purchase");
String input = path.get("input");
String output = path.get("output");
HdfsDAO hdfs = new HdfsDAO(HDFS, conf);
hdfs.rmr(input);
hdfs.mkdirs(input);
hdfs.copyFile(local_data, input);
Job job = Job.getInstance(conf);
job.setJarByClass(Purchase.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(PurchaseMapper.class);
job.setReducerClass(PurchaseReducer.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.setInputPaths(job, new Path(input));
FileOutputFormat.setOutputPath(job, new Path(output));
job.waitForCompletion(true);
}
public static JobConf config() {
JobConf conf = new JobConf(Purchase.class);
conf.setJobName("purchase");
conf.addResource("classpath:/hadoop/core-site.xml");
conf.addResource("classpath:/hadoop/hdfs-site.xml");
conf.addResource("classpath:/hadoop/mapred-site.xml");
conf.addResource("classpath:/hadoop/yarn-site.xml");
return conf;
}
public static Map
path(){
Map
path = new HashMap
();
path.put("purchase", Purchase.class.getClassLoader().getResource("logfile/biz/purchase.csv").getPath()); // 源文件数据
path.put("input", HDFS + "/user/hdfs/biz/purchase"); //hdfs存储路径
path.put("output", HDFS + "/user/hdfs/biz/purchase/output"); //hdfs输出路径
return path;
}
public static void main(String[] args) throws Exception {
run(path());
}
}Sell calculation code:
package zkqueue;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Pattern;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
/**
* 销售数据计算
* @author Jon_China
*/
public class Sell {
public static final String HDFS = "hdfs://192.168.8.101:9000";
public static final Pattern DELIMITER = Pattern.compile("[\t,]");
public static class SellMapper extends Mapper
{
private String month = "2013-01";
private Text k = new Text(month);
private IntWritable v = new IntWritable();
private int money = 0;
public void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException {
System.out.println(values.toString());
String[] tokens = DELIMITER.split(values.toString());
if (tokens[3].startsWith(month)) { // 1月的数据
money = Integer.parseInt(tokens[1]) * Integer.parseInt(tokens[2]); //单价*数量
v.set(money);
context.write(k, v);
}
}
}
public static class SellReducer extends Reducer
{
private IntWritable v = new IntWritable();
private int money = 0;
@Override
public void reduce(Text key, Iterable
values, Context context) throws IOException, InterruptedException {
for (IntWritable line : values) {
money += line.get();
}
v.set(money);
context.write(null, v);
System.out.println("Output:" + key + "," + money);
}
}
public static void run(Map
path) throws IOException, InterruptedException, ClassNotFoundException {
JobConf conf = config();
String local_data = path.get("sell");
String input = path.get("input");
String output = path.get("output");
// 初始化sell
HdfsDAO hdfs = new HdfsDAO(HDFS, conf);
hdfs.rmr(input);
hdfs.mkdirs(input);
hdfs.copyFile(local_data, input);
Job job = Job.getInstance(conf);
job.setJarByClass(Sell.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(SellMapper.class);
job.setReducerClass(SellReducer.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.setInputPaths(job, new Path(input));
FileOutputFormat.setOutputPath(job, new Path(output));
job.waitForCompletion(true);
}
public static JobConf config() { // Hadoop集群的远程配置信息
JobConf conf = new JobConf(Purchase.class);
conf.setJobName("purchase");
conf.addResource("classpath:/hadoop/core-site.xml");
conf.addResource("classpath:/hadoop/hdfs-site.xml");
conf.addResource("classpath:/hadoop/mapred-site.xml");
conf.addResource("classpath:/hadoop/yarn-site.xml");
return conf;
}
public static Map
path(){
Map
path = new HashMap
();
path.put("sell", Sell.class.getClassLoader().getResource("logfile/biz/sell.csv").getPath()); // 本地的数据文件
path.put("input", HDFS + "/user/hdfs/biz/sell"); // HDFS的目录
path.put("output", HDFS + "/user/hdfs/biz/sell/output"); // 输出目录
return path;
}
public static void main(String[] args) throws Exception {
run(path());
}
}Other cost calculation code:
package zkqueue;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.regex.Pattern;
public class Other {
public static String file = "/logfile/biz/other.csv";
public static final Pattern DELIMITER = Pattern.compile("[\t,]");
private static String month = "2017-01";
public static void main(String[] args) throws IOException {
calcOther(file);
}
public static int calcOther(String file) throws IOException {
int money = 0;
BufferedReader br = new BufferedReader(new FileReader(new File(file)));
String s = null;
while ((s = br.readLine()) != null) {
String[] tokens = DELIMITER.split(s);
if (tokens[0].startsWith(month)) { // 1月的数据
money += Integer.parseInt(tokens[1]);
}
}
br.close();
System.out.println("Output:" + month + "," + money);
return money;
}
}Profit calculation code:
package zkqueue;
import java.io.IOException;
/**
* 利润计算
* @author Jon_China
*/
public class Profit {
public static void main(String[] args) throws Exception {
profit();
}
public static void profit() throws Exception {
int sell = getSell();
int purchase = getPurchase();
int other = getOther();
int profit = sell - purchase - other;
System.out.printf("profit = sell - purchase - other = %d - %d - %d = %d\n", sell, purchase, other, profit);
}
public static int getPurchase() throws Exception {
HdfsDAO hdfs = new HdfsDAO(Purchase.HDFS, Purchase.config());
return Integer.parseInt(hdfs.cat(Purchase.path().get("output") + "/part-r-00000").trim());
}
public static int getSell() throws Exception {
HdfsDAO hdfs = new HdfsDAO(Sell.HDFS, Sell.config());
return Integer.parseInt(hdfs.cat(Sell.path().get("output") + "/part-r-00000").trim());
}
public static int getOther() throws IOException {
return Other.calcOther(Other.file);
}
}Zookeeper task scheduling code (distributed queue):
package zkqueue;
import java.io.IOException;
import java.util.List;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
/**
* 分布式队列zookeeper调度
* @author Jon_China
*/
public class QueueZookeeper {
//设置队列目录树
final public static String QUEUE = "/queue";
final public static String PROFIT = "/queue/profit";
final public static String PURCHASE = "/queue/purchase";
final public static String SELL = "/queue/sell";
final public static String OTHER = "/queue/other";
public static void main(String[] args) throws Exception {
if (args.length == 0) {
System.out.println("Please start a task:");
} else {
doAction(Integer.parseInt(args[0]));
}
}
public static void doAction(int client) throws Exception {
//zookeeper地址
String host1 = "192.168.8.104:2181";
String host2 = "192.168.8.105:2181";
String host3 = "192.168.8.106:2181";
ZooKeeper zk = null;
switch (client) { //1,2,3分别将不同任务加入队列
case 1:
zk = connection(host1);
initQueue(zk);
doPurchase(zk);
break;
case 2:
zk = connection(host2);
initQueue(zk);
doSell(zk);
break;
case 3:
zk = connection(host3);
initQueue(zk);
doOther(zk);
break;
}
}
// 创建一个与服务器的连接
public static ZooKeeper connection(String host) throws IOException {
ZooKeeper zk = new ZooKeeper(host, 60000, new Watcher() {
// 监控所有被触发的事件
public void process(WatchedEvent event) {
if (event.getType() == Event.EventType.NodeCreated && event.getPath().equals(PROFIT)) {
System.out.println("Queue has Completed!!!");
}
}
});
return zk;
}
/**
* 初始化队列
*/
public static void initQueue(ZooKeeper zk) throws KeeperException, InterruptedException {
System.out.println("WATCH => " + PROFIT);
zk.exists(PROFIT, true);
if (zk.exists(QUEUE, false) == null) {
System.out.println("create " + QUEUE);
zk.create(QUEUE, QUEUE.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
} else {
System.out.println(QUEUE + " is exist!");
}
}
/**
* 采购任务
*/
public static void doPurchase(ZooKeeper zk) throws Exception {
if (zk.exists(PURCHASE, false) == null) {
Purchase.run(Purchase.path());
System.out.println("create " + PURCHASE);
zk.create(PURCHASE, PURCHASE.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
} else {
System.out.println(PURCHASE + " is exist!");
}
isCompleted(zk);
}
/**
* 销售任务
*/
public static void doSell(ZooKeeper zk) throws Exception {
if (zk.exists(SELL, false) == null) {
Sell.run(Sell.path());
System.out.println("create " + SELL);
zk.create(SELL, SELL.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
} else {
System.out.println(SELL + " is exist!");
}
isCompleted(zk);
}
/**
* 其他计算任务
*/
public static void doOther(ZooKeeper zk) throws Exception {
if (zk.exists(OTHER, false) == null) {
Other.calcOther(Other.file);
System.out.println("create " + OTHER);
zk.create(OTHER, OTHER.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
} else {
System.out.println(OTHER + " is exist!");
}
isCompleted(zk);
}
/**
* 检测完成情况
*/
public static void isCompleted(ZooKeeper zk) throws Exception {
int size = 3;
List
children = zk.getChildren(QUEUE, true);
int length = children.size();
System.out.println("Queue Complete:" + length + "/" + size);
if (length >= size) {
System.out.println("create " + PROFIT);
Profit.profit();
zk.create(PROFIT, PROFIT.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
for (String child : children) { // 清空节点
zk.delete(QUEUE + "/" + child, -1);
}
}
}
}Running result: after the three condition nodes (purchase, sell, other) are satisfied, the profit program runs automatically and prints the profit for January 2017 as -6693765.
Source code is available at the GitHub repository linked in the original article.
Architecture Digest
Focusing on Java backend development, covering application architecture from top-tier internet companies (high availability, high performance, high stability), big data, machine learning, Java architecture, and other popular fields.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.