Big Data 28 min read

How to Sync Billions of MySQL Records to HBase: 3 Powerful Methods Using Hadoop, Kafka, and Flink

This comprehensive guide walks you through setting up a pseudo‑distributed Hadoop environment, loading massive MySQL data with LOAD DATA, Python scripts, and multithreading, and then synchronizing the data to HBase using three approaches—Sqoop, a Kafka‑Thrift pipeline, and a real‑time Kafka‑Flink pipeline—while also comparing query performance of HBase and Phoenix.

Programmer DD
Programmer DD
Programmer DD
How to Sync Billions of MySQL Records to HBase: 3 Powerful Methods Using Hadoop, Kafka, and Flink

1. Introduction

Learn how to transfer billions of rows from MySQL to HBase using three different synchronization schemes and practical implementations.

2. Environment Preparation

Deploy a pseudo‑distributed Hadoop cluster on Ubuntu 16.04 and install the required components: Hadoop, HBase, Zookeeper, Phoenix, Kafka, Maxwell, and Flink.

2.1 Install Java

sudo apt-get install openjdk-8-jdk

2.2 Create Hadoop User

sudo groupadd hadoop
sudo useradd -s /bin/bash -g hadoop -d /home/hadoop -m hadoop

2.3 Install Hadoop

wget https://mirrors.cnnic.cn/apache/hadoop/common/hadoop-3.0.2/hadoop-3.0.2.tar.gz
tar zxvf hadoop-3.0.2.tar.gz
sudo mv hadoop-3.0.2 /usr/local/hadoop

2.4 Configure core‑site.xml and hdfs‑site.xml

<configuration>
  <property>
    <name>fs.defaultFS</name>
    <value>hdfs://localhost:9012</value>
  </property>
</configuration>

2.5 Start Hadoop

cd /usr/local/hadoop/sbin
./start-dfs.sh
./start-yarn.sh

2.6 Install and Configure HBase

wget https://mirrors.cnnic.cn/apache/hbase/stable/hbase-1.4.9-bin.tar.gz
tar zxvf hbase-1.4.9-bin.tar.gz
sudo mv hbase-1.4.9-bin /usr/local/hbase
<configuration>
  <property>
    <name>hbase.rootdir</name>
    <value>hdfs://localhost:9012/hbase</value>
  </property>
  <property>
    <name>hbase.zookeeper.property.dataDir</name>
    <value>/home/hadoop/zookeeper</value>
  </property>
</configuration>
bin/start-hbase.sh

2.7 Install Phoenix

tar -xvf apache-phoenix-4.14.2-HBase-1.4-bin.tar.gz
mv phoenix-4.14.2-HBase-1.4 /usr/local/phoenix

2.8 Install Kafka and Zookeeper

wget http://kafka.apache.org/downloads/kafka_2.12-2.8.0.tgz
tar -xzf kafka_2.12-2.8.0.tgz
sudo mv kafka_2.12-2.8.0 /usr/local/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties

2.9 Install Maxwell (MySQL binlog to Kafka)

./maxwell/bin/maxwell --user='maxwell' --password='maxwell' --host='127.0.0.1' --producer=kafka --kafka.bootstrap.servers=localhost:9092 --kafka_topic=test

2.10 Install Flink

wget https://downloads.apache.org/flink/flink-1.13.2/flink-1.13.2-bin-scala_2.12.tgz
tar -zxf flink-1.13.2-bin-scala_2.12.tgz
mv flink-1.13.2 /usr/local/flink
cd /usr/local/flink/bin
./start-cluster.sh

3. MySQL Data Insertion Methods

Three ways to load data into MySQL are demonstrated.

3.1 LOAD DATA INFILE

load data local infile "/home/light/mysql/gps1.txt" into table loadTable fields terminated by ',' lines terminated by "
" (carflag, touchevent, opstatus, gpstime, gpslongitude, gpslatitude, gpsspeed, gpsorientation, gpsstatus);

3.2 Python Batch Insert (pymysql)

with open('/home/light/mysql/gps1.txt','r') as fp:
    for line in fp:
        # parse line and add to data_list
        if count % 70000 == 0:
            cur.executemany(sql, data_list)
            conn.commit()
            data_list = []
if data_list:
    cur.executemany(sql, data_list)
    conn.commit()

3.3 Python Multithreaded Insert

def multicore(self):
    file_list = [1,2324,4648,6972,9298]
    m1 = mp.Process(target=self.run, args=(file_list[0],file_list[1],'m1'))
    m2 = mp.Process(target=self.run, args=(file_list[1]+1,file_list[2],'m2'))
    m3 = mp.Process(target=self.run, args=(file_list[2]+1,file_list[3],'m3'))
    m4 = mp.Process(target=self.run, args=(file_list[3]+1,file_list[4],'m4'))
    m1.start(); m2.start(); m3.start(); m4.start()
    m1.join(); m2.join(); m3.join(); m4.join()

4. Synchronization Solutions

4.1 Sqoop (MySQL → HBase)

sqoop import \
  --connect jdbc:mysql://localhost:3306/loaddb \
  --username root --password xxxx \
  --query "SELECT id,carflag,touchevent,opstatus,gpstime,gpslongitude,gpslatitude,gpsspeed,gpsorientation,gpsstatus FROM loaddb.loadTable1 WHERE \$CONDITIONS" \
  --hbase-row-key id \
  --hbase-create-table \
  --column-family info \
  --hbase-table mysql_data \
  --split-by id -m 4

Data is split into 40 000‑row chunks to avoid memory overflow. Virtual memory checking is disabled in yarn-site.xml:

<property>
  <name>yarn.nodemanager.vmem-check-enabled</name>
  <value>false</value>
</property>

4.2 Kafka‑Thrift Pipeline

Enable MySQL binlog (ROW format) and use Maxwell to publish changes to Kafka. A Python consumer reads the JSON messages, transforms column names with the info: prefix, and writes batches to HBase via the Thrift API.

# Example consumer snippet
for m in consumer:
    data = json.loads(m.value.decode('utf-8'))
    if data['database']=='loaddb' and data['table']=='sqlbase1':
        row_id = data['data']['id']
        row = str(row_id)
        del data['data']['id']
        hbase_data = {f"info:{k}": str(v) for k,v in data['data'].items()}
        batch.put(row, hbase_data)

4.3 Kafka‑Flink Real‑Time Pipeline

SingleOutputStreamOperator<Student> student = env.addSource(
    new FlinkKafkaConsumer011<>( "test", new SimpleStringSchema(), props))
    .setParallelism(9)
    .map(s -> JSON.parseObject(s, Student.class))
    .setParallelism(9);

student.timeWindowAll(Time.seconds(3)).apply(new AllWindowFunction<Student, List<Student>, TimeWindow>(){
    @Override
    public void apply(TimeWindow window, Iterable<Student> values, Collector<List<Student>> out) {
        List<Student> list = Lists.newArrayList(values);
        if (!list.isEmpty()) {
            out.collect(list);
        }
    }
}).addSink(new SinkToHBase()).setParallelism(9);

This pipeline reads MySQL changes from Kafka, groups them in 3‑second windows, and writes batches to HBase.

5. Query Performance Comparison

Using the HBase shell: count 'mysql_data' Full table scan takes ~3856 seconds (≈1 hour 7 min). Using INTERVAL => 10000000 reduces it to ~3372 seconds (≈56 min). Phoenix query time is ~2015 seconds (≈33 min). Co‑processor scan finishes in ~1874 seconds (≈31 min). Ranking of speed: Co‑processor > Phoenix > Interval count > Full count.

6. Synchronization Efficiency Comparison

Approximate runtimes:

Sqoop import: ~50 hours

Kafka‑Thrift single‑row insert: similar to Sqoop

Kafka‑Thrift batch insert: ~7 hours

Kafka‑Flink pipeline: 3‑7 hours (fastest)

Key takeaways:

Large data must be split and inserted in batches.

Maintain ordered inserts for efficient HBase loading.

Phoenix queries benefit from timeout settings.

Flink provides the most stable and highest‑throughput solution.

Python‑Thrift can combine batch and single inserts.

Sqoop requires disabling virtual memory checks.

When HBase crashes, check Zookeeper first.

7. Conclusion

The guide demonstrates practical steps to set up a Hadoop‑HBase ecosystem, load massive MySQL data, and synchronize it using three different pipelines, while providing performance benchmarks and optimization tips for real‑world big‑data projects.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

FlinkKafkamysqlHBasedata-syncPhoenixSqoop
Programmer DD
Written by

Programmer DD

A tinkering programmer and author of "Spring Cloud Microservices in Action"

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.