How to Sync Billions of MySQL Records to HBase: 3 Powerful Methods Using Hadoop, Kafka, and Flink
This comprehensive guide walks you through setting up a pseudo‑distributed Hadoop environment, loading massive MySQL data with LOAD DATA, Python scripts, and multithreading, and then synchronizing the data to HBase using three approaches—Sqoop, a Kafka‑Thrift pipeline, and a real‑time Kafka‑Flink pipeline—while also comparing query performance of HBase and Phoenix.
1. Introduction
Learn how to transfer billions of rows from MySQL to HBase using three different synchronization schemes and practical implementations.
2. Environment Preparation
Deploy a pseudo‑distributed Hadoop cluster on Ubuntu 16.04 and install the required components: Hadoop, HBase, Zookeeper, Phoenix, Kafka, Maxwell, and Flink.
2.1 Install Java
sudo apt-get install openjdk-8-jdk2.2 Create Hadoop User
sudo groupadd hadoop
sudo useradd -s /bin/bash -g hadoop -d /home/hadoop -m hadoop2.3 Install Hadoop
wget https://mirrors.cnnic.cn/apache/hadoop/common/hadoop-3.0.2/hadoop-3.0.2.tar.gz
tar zxvf hadoop-3.0.2.tar.gz
sudo mv hadoop-3.0.2 /usr/local/hadoop2.4 Configure core‑site.xml and hdfs‑site.xml
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://localhost:9012</value>
</property>
</configuration>2.5 Start Hadoop
cd /usr/local/hadoop/sbin
./start-dfs.sh
./start-yarn.sh2.6 Install and Configure HBase
wget https://mirrors.cnnic.cn/apache/hbase/stable/hbase-1.4.9-bin.tar.gz
tar zxvf hbase-1.4.9-bin.tar.gz
sudo mv hbase-1.4.9-bin /usr/local/hbase <configuration>
<property>
<name>hbase.rootdir</name>
<value>hdfs://localhost:9012/hbase</value>
</property>
<property>
<name>hbase.zookeeper.property.dataDir</name>
<value>/home/hadoop/zookeeper</value>
</property>
</configuration> bin/start-hbase.sh2.7 Install Phoenix
tar -xvf apache-phoenix-4.14.2-HBase-1.4-bin.tar.gz
mv phoenix-4.14.2-HBase-1.4 /usr/local/phoenix2.8 Install Kafka and Zookeeper
wget http://kafka.apache.org/downloads/kafka_2.12-2.8.0.tgz
tar -xzf kafka_2.12-2.8.0.tgz
sudo mv kafka_2.12-2.8.0 /usr/local/kafka bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties2.9 Install Maxwell (MySQL binlog to Kafka)
./maxwell/bin/maxwell --user='maxwell' --password='maxwell' --host='127.0.0.1' --producer=kafka --kafka.bootstrap.servers=localhost:9092 --kafka_topic=test2.10 Install Flink
wget https://downloads.apache.org/flink/flink-1.13.2/flink-1.13.2-bin-scala_2.12.tgz
tar -zxf flink-1.13.2-bin-scala_2.12.tgz
mv flink-1.13.2 /usr/local/flink cd /usr/local/flink/bin
./start-cluster.sh3. MySQL Data Insertion Methods
Three ways to load data into MySQL are demonstrated.
3.1 LOAD DATA INFILE
load data local infile "/home/light/mysql/gps1.txt" into table loadTable fields terminated by ',' lines terminated by "
" (carflag, touchevent, opstatus, gpstime, gpslongitude, gpslatitude, gpsspeed, gpsorientation, gpsstatus);3.2 Python Batch Insert (pymysql)
with open('/home/light/mysql/gps1.txt','r') as fp:
for line in fp:
# parse line and add to data_list
if count % 70000 == 0:
cur.executemany(sql, data_list)
conn.commit()
data_list = []
if data_list:
cur.executemany(sql, data_list)
conn.commit()3.3 Python Multithreaded Insert
def multicore(self):
file_list = [1,2324,4648,6972,9298]
m1 = mp.Process(target=self.run, args=(file_list[0],file_list[1],'m1'))
m2 = mp.Process(target=self.run, args=(file_list[1]+1,file_list[2],'m2'))
m3 = mp.Process(target=self.run, args=(file_list[2]+1,file_list[3],'m3'))
m4 = mp.Process(target=self.run, args=(file_list[3]+1,file_list[4],'m4'))
m1.start(); m2.start(); m3.start(); m4.start()
m1.join(); m2.join(); m3.join(); m4.join()4. Synchronization Solutions
4.1 Sqoop (MySQL → HBase)
sqoop import \
--connect jdbc:mysql://localhost:3306/loaddb \
--username root --password xxxx \
--query "SELECT id,carflag,touchevent,opstatus,gpstime,gpslongitude,gpslatitude,gpsspeed,gpsorientation,gpsstatus FROM loaddb.loadTable1 WHERE \$CONDITIONS" \
--hbase-row-key id \
--hbase-create-table \
--column-family info \
--hbase-table mysql_data \
--split-by id -m 4Data is split into 40 000‑row chunks to avoid memory overflow. Virtual memory checking is disabled in yarn-site.xml:
<property>
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>false</value>
</property>4.2 Kafka‑Thrift Pipeline
Enable MySQL binlog (ROW format) and use Maxwell to publish changes to Kafka. A Python consumer reads the JSON messages, transforms column names with the info: prefix, and writes batches to HBase via the Thrift API.
# Example consumer snippet
for m in consumer:
data = json.loads(m.value.decode('utf-8'))
if data['database']=='loaddb' and data['table']=='sqlbase1':
row_id = data['data']['id']
row = str(row_id)
del data['data']['id']
hbase_data = {f"info:{k}": str(v) for k,v in data['data'].items()}
batch.put(row, hbase_data)4.3 Kafka‑Flink Real‑Time Pipeline
SingleOutputStreamOperator<Student> student = env.addSource(
new FlinkKafkaConsumer011<>( "test", new SimpleStringSchema(), props))
.setParallelism(9)
.map(s -> JSON.parseObject(s, Student.class))
.setParallelism(9);
student.timeWindowAll(Time.seconds(3)).apply(new AllWindowFunction<Student, List<Student>, TimeWindow>(){
@Override
public void apply(TimeWindow window, Iterable<Student> values, Collector<List<Student>> out) {
List<Student> list = Lists.newArrayList(values);
if (!list.isEmpty()) {
out.collect(list);
}
}
}).addSink(new SinkToHBase()).setParallelism(9);This pipeline reads MySQL changes from Kafka, groups them in 3‑second windows, and writes batches to HBase.
5. Query Performance Comparison
Using the HBase shell: count 'mysql_data' Full table scan takes ~3856 seconds (≈1 hour 7 min). Using INTERVAL => 10000000 reduces it to ~3372 seconds (≈56 min). Phoenix query time is ~2015 seconds (≈33 min). Co‑processor scan finishes in ~1874 seconds (≈31 min). Ranking of speed: Co‑processor > Phoenix > Interval count > Full count.
6. Synchronization Efficiency Comparison
Approximate runtimes:
Sqoop import: ~50 hours
Kafka‑Thrift single‑row insert: similar to Sqoop
Kafka‑Thrift batch insert: ~7 hours
Kafka‑Flink pipeline: 3‑7 hours (fastest)
Key takeaways:
Large data must be split and inserted in batches.
Maintain ordered inserts for efficient HBase loading.
Phoenix queries benefit from timeout settings.
Flink provides the most stable and highest‑throughput solution.
Python‑Thrift can combine batch and single inserts.
Sqoop requires disabling virtual memory checks.
When HBase crashes, check Zookeeper first.
7. Conclusion
The guide demonstrates practical steps to set up a Hadoop‑HBase ecosystem, load massive MySQL data, and synchronize it using three different pipelines, while providing performance benchmarks and optimization tips for real‑world big‑data projects.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Programmer DD
A tinkering programmer and author of "Spring Cloud Microservices in Action"
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
