Reading HBase with Flink 1.12 – Environment Setup, Code Samples, and Result
This article demonstrates how to configure Flink 1.12 to read data from HBase, covering the required environment components, HBase table creation, Maven dependencies, Java POJO and Flink‑SQL code, and showing the query results with and without printing the TableResult.
Yesterday a group member asked about reading HBase with Flink 1.12, so I share the following tutorial originally authored by Ashiamd.
1. Environment
zookeeper 3.6.2
HBase 2.4.0
Flink 1.12.1
2. HBase Table
# 创建表
create 'u_m_01' , 'u_m_r'
# 插入数据
put 'u_m_01', 'a,A', 'u_m_r:r' , '1'
put 'u_m_01', 'a,B', 'u_m_r:r' , '3'
put 'u_m_01', 'b,B', 'u_m_r:r' , '3'
put 'u_m_01', 'b,C', 'u_m_r:r' , '4'
put 'u_m_01', 'c,A', 'u_m_r:r' , '2'
put 'u_m_01', 'c,C', 'u_m_r:r' , '5'
put 'u_m_01', 'c,D', 'u_m_r:r' , '1'
put 'u_m_01', 'd,B', 'u_m_r:r' , '5'
put 'u_m_01', 'd,D', 'u_m_r:r' , '2'
put 'u_m_01', 'e,A', 'u_m_r:r' , '3'
put 'u_m_01', 'e,B', 'u_m_r:r' , '2'
put 'u_m_01', 'f,A', 'u_m_r:r' , '1'
put 'u_m_01', 'f,B', 'u_m_r:r' , '2'
put 'u_m_01', 'f,D', 'u_m_r:r' , '3'
put 'u_m_01', 'g,C', 'u_m_r:r' , '1'
put 'u_m_01', 'g,D', 'u_m_r:r' , '4'
put 'u_m_01', 'h,A', 'u_m_r:r' , '1'
put 'u_m_01', 'h,B', 'u_m_r:r' , '2'
put 'u_m_01', 'h,C', 'u_m_r:r' , '4'
put 'u_m_01', 'h,D', 'u_m_r:r' , '5'3. Maven Dependencies (pom.xml)
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>flink-hive-hbase</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<flink.version>1.12.1</flink.version>
<scala.binary.version>2.12</scala.binary.version>
<hive.version>3.1.2</hive.version>
<mysql.version>8.0.19</mysql.version>
<hbase.version>2.4.0</hbase.version>
</properties>
<dependencies>
<!-- Flink -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
... (other dependencies omitted for brevity) ...
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
</project>4. Flink‑Java Code
POJO class used for the result:
package entity;
import java.io.Serializable;
public class UserMovie implements Serializable {
private static final long serialVersionUID = 256158274329337559L;
private String userId;
private String movieId;
private Double ratting;
// getters, setters, constructors, toString omitted for brevity
}Main test program that creates a Flink Table linked to HBase, runs a SQL query, and collects the rows into UserMovie objects:
package hbase;
import entity.UserMovie;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;
public class HBaseTest_01 {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inBatchMode()
.build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
TableResult tableResult = tableEnv.executeSql(
"CREATE TABLE u_m (" +
" rowkey STRING," +
" u_m_r ROW<r STRING>," +
" PRIMARY KEY (rowkey) NOT ENFORCED" +
" ) WITH (" +
" 'connector' = 'hbase-2.2' ," +
" 'table-name' = 'default:u_m_01' ," +
" 'zookeeper.quorum' = '127.0.0.1:2181'" +
" )");
Table table = tableEnv.sqlQuery("SELECT * FROM u_m");
TableResult executeResult = table.execute();
CloseableIterator<Row> collect = executeResult.collect();
executeResult.print(); // comment out to avoid consuming iterator
List<UserMovie> userMovieList = new ArrayList<>();
collect.forEachRemaining(new Consumer<Row>() {
@Override
public void accept(Row row) {
String field0 = String.valueOf(row.getField(0));
String[] user_movie = field0.split(",");
Double ratting = Double.valueOf(String.valueOf(row.getField(1)));
userMovieList.add(new UserMovie(user_movie[0], user_movie[1], ratting));
}
});
System.out.println("................");
for (UserMovie um : userMovieList) {
System.out.println(um);
}
}
}5. Output
When executeResult.print(); is left uncommented, Flink prints the table rows in a tabular format:
+--------------------------------+--------------------------------+
| rowkey | u_m_r |
+--------------------------------+--------------------------------+
| a,A | 1 |
| a,B | 3 |
... (remaining rows omitted) ...
+--------------------------------+--------------------------------+
20 rows in set
................If the print line is commented out, the program instead iterates over the collected rows and prints the UserMovie objects:
................
UserMovie{userId='a', movieId='A', ratting=1.0}
UserMovie{userId='a', movieId='B', ratting=3.0}
... (remaining objects omitted) ...Note
When defining the HBase table in Flink SQL, the column types were initially set to STRING even though the values are numeric; using INT caused errors, so keeping them as STRING works for this example.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Big Data Technology & Architecture
Wang Zhiwu, a big data expert, dedicated to sharing big data technology.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
