Migrating Data from HBase to Kafka Using MapReduce
This article explains how to reverse the typical data flow by extracting massive Rowkeys from HBase with MapReduce, storing them on HDFS, and then using batch Get operations to retrieve the full records and write them into Kafka, while handling retries and monitoring progress.
In many real‑time applications data is stored in an HBase cluster, but sometimes it is necessary to migrate that data back to Kafka.
Typically data flows from source → Kafka → consumer (Flink, Spark, Kafka API) → HBase, but the reverse direction requires a different approach.
The article outlines a solution that leverages HBase Get/List<Get> operations and MapReduce to extract Rowkeys, store them on HDFS, and then read them back to fetch the full rows and write them to Kafka.
Key steps include:
Extracting Rowkeys using a MapReduce job with FirstKeyOnlyFilter to minimize data transfer.
Generating Rowkey files on HDFS, splitting them according to data volume.
In a second MapReduce job, reading the Rowkey files, performing batch List<Get> on HBase, and sending the records to Kafka.
Recording successful and failed Rowkeys on HDFS to enable retry of failed writes.
A sample Java MapReduce program (MRROW2HDFS) is provided, showing configuration of HBase connection, scan setup, mapper and reducer classes, and how to write Rowkeys to HDFS.
public class MRROW2HDFS {
public static void main(String[] args) throws Exception {
Configuration config = HBaseConfiguration.create(); // HBase Config info
Job job = Job.getInstance(config, "MRROW2HDFS");
job.setJarByClass(MRROW2HDFS.class);
job.setReducerClass(ROWReducer.class);
String hbaseTableName = "hbase_tbl_name";
Scan scan = new Scan();
scan.setCaching(1000);
scan.setCacheBlocks(false);
scan.setFilter(new FirstKeyOnlyFilter());
TableMapReduceUtil.initTableMapperJob(hbaseTableName, scan, ROWMapper.class, Text.class, Text.class, job);
FileOutputFormat.setOutputPath(job, new Path("/tmp/rowkey.list")); // input you storage rowkey hdfs path
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
public static class ROWMapper extends TableMapper<Text, Text> {
@Override
protected void map(ImmutableBytesWritable key, Result value,
Mapper<ImmutableBytesWritable, Result, Text, Text>.Context context)
throws IOException, InterruptedException {
for (Cell cell : value.rawCells()) {
// Filter date range
// context.write(...);
}
}
}
public static class ROWReducer extends Reducer<Text, Text, Text, Text>{
private Text result = new Text();
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
for(Text val:values){
result.set(val);
context.write(key, result);
}
}
}
}The process is straightforward, but attention must be paid to Rowkey formatting, handling of empty spaces, and proper logging of success/failure for reliable data migration.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
DataFunTalk
Dedicated to sharing and discussing big data and AI technology applications, aiming to empower a million data scientists. Regularly hosts live tech talks and curates articles on big data, recommendation/search algorithms, advertising algorithms, NLP, intelligent risk control, autonomous driving, and machine learning/deep learning.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
