Big Data 11 min read

Reading HBase with Flink 1.12 – Environment Setup, Code Samples, and Result

This article demonstrates how to configure Flink 1.12 to read data from HBase, covering the required environment components, HBase table creation, Maven dependencies, Java POJO and Flink‑SQL code, and showing the query results with and without printing the TableResult.

Big Data Technology & Architecture
Big Data Technology & Architecture
Big Data Technology & Architecture
Reading HBase with Flink 1.12 – Environment Setup, Code Samples, and Result

Yesterday a group member asked about reading HBase with Flink 1.12, so I share the following tutorial originally authored by Ashiamd.

1. Environment

zookeeper 3.6.2

HBase 2.4.0

Flink 1.12.1

2. HBase Table

# 创建表
create 'u_m_01' , 'u_m_r'

# 插入数据
put 'u_m_01', 'a,A', 'u_m_r:r' , '1'
put 'u_m_01', 'a,B', 'u_m_r:r' , '3'
put 'u_m_01', 'b,B', 'u_m_r:r' , '3'
put 'u_m_01', 'b,C', 'u_m_r:r' , '4'
put 'u_m_01', 'c,A', 'u_m_r:r' , '2'
put 'u_m_01', 'c,C', 'u_m_r:r' , '5'
put 'u_m_01', 'c,D', 'u_m_r:r' , '1'
put 'u_m_01', 'd,B', 'u_m_r:r' , '5'
put 'u_m_01', 'd,D', 'u_m_r:r' , '2'
put 'u_m_01', 'e,A', 'u_m_r:r' , '3'
put 'u_m_01', 'e,B', 'u_m_r:r' , '2'
put 'u_m_01', 'f,A', 'u_m_r:r' , '1'
put 'u_m_01', 'f,B', 'u_m_r:r' , '2'
put 'u_m_01', 'f,D', 'u_m_r:r' , '3'
put 'u_m_01', 'g,C', 'u_m_r:r' , '1'
put 'u_m_01', 'g,D', 'u_m_r:r' , '4'
put 'u_m_01', 'h,A', 'u_m_r:r' , '1'
put 'u_m_01', 'h,B', 'u_m_r:r' , '2'
put 'u_m_01', 'h,C', 'u_m_r:r' , '4'
put 'u_m_01', 'h,D', 'u_m_r:r' , '5'

3. Maven Dependencies (pom.xml)

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>org.example</groupId>
    <artifactId>flink-hive-hbase</artifactId>
    <version>1.0-SNAPSHOT</version>
    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <flink.version>1.12.1</flink.version>
        <scala.binary.version>2.12</scala.binary.version>
        <hive.version>3.1.2</hive.version>
        <mysql.version>8.0.19</mysql.version>
        <hbase.version>2.4.0</hbase.version>
    </properties>
    <dependencies>
        <!-- Flink -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        ... (other dependencies omitted for brevity) ...
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>
</project>

4. Flink‑Java Code

POJO class used for the result:

package entity;
import java.io.Serializable;
public class UserMovie implements Serializable {
    private static final long serialVersionUID = 256158274329337559L;
    private String userId;
    private String movieId;
    private Double ratting;
    // getters, setters, constructors, toString omitted for brevity
}

Main test program that creates a Flink Table linked to HBase, runs a SQL query, and collects the rows into UserMovie objects:

package hbase;
import entity.UserMovie;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;
public class HBaseTest_01 {
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings settings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inBatchMode()
                .build();
        TableEnvironment tableEnv = TableEnvironment.create(settings);
        TableResult tableResult = tableEnv.executeSql(
                "CREATE TABLE u_m (" +
                " rowkey STRING," +
                " u_m_r ROW<r STRING>," +
                " PRIMARY KEY (rowkey) NOT ENFORCED" +
                " ) WITH (" +
                " 'connector' = 'hbase-2.2' ," +
                " 'table-name' = 'default:u_m_01' ," +
                " 'zookeeper.quorum' = '127.0.0.1:2181'" +
                " )");
        Table table = tableEnv.sqlQuery("SELECT * FROM u_m");
        TableResult executeResult = table.execute();
        CloseableIterator<Row> collect = executeResult.collect();
        executeResult.print(); // comment out to avoid consuming iterator
        List<UserMovie> userMovieList = new ArrayList<>();
        collect.forEachRemaining(new Consumer<Row>() {
            @Override
            public void accept(Row row) {
                String field0 = String.valueOf(row.getField(0));
                String[] user_movie = field0.split(",");
                Double ratting = Double.valueOf(String.valueOf(row.getField(1)));
                userMovieList.add(new UserMovie(user_movie[0], user_movie[1], ratting));
            }
        });
        System.out.println("................");
        for (UserMovie um : userMovieList) {
            System.out.println(um);
        }
    }
}

5. Output

When executeResult.print(); is left uncommented, Flink prints the table rows in a tabular format:

+--------------------------------+--------------------------------+
|                         rowkey |                          u_m_r |
+--------------------------------+--------------------------------+
|                            a,A |                              1 |
|                            a,B |                              3 |
... (remaining rows omitted) ...
+--------------------------------+--------------------------------+
20 rows in set
................

If the print line is commented out, the program instead iterates over the collected rows and prints the UserMovie objects:

................
UserMovie{userId='a', movieId='A', ratting=1.0}
UserMovie{userId='a', movieId='B', ratting=3.0}
... (remaining objects omitted) ...

Note

When defining the HBase table in Flink SQL, the column types were initially set to STRING even though the values are numeric; using INT caused errors, so keeping them as STRING works for this example.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

JavaFlinkSQLStreamingHBase
Big Data Technology & Architecture
Written by

Big Data Technology & Architecture

Wang Zhiwu, a big data expert, dedicated to sharing big data technology.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.