Big Data 24 min read

Building a Flink SQL Platform on Zeppelin: Installation, Configuration, and Advanced Use Cases

This guide walks through setting up Apache Zeppelin as a low‑cost, SQL‑centric development platform for Flink, covering environment preparation, installation, interpreter configuration, execution modes, verification, common pitfalls, dimension‑table joins, custom UDFs, Redis integration, and dual‑stream join techniques.

Big Data Technology & Architecture
Big Data Technology & Architecture
Big Data Technology & Architecture
Building a Flink SQL Platform on Zeppelin: Installation, Configuration, and Advanced Use Cases

In this article the author explains how to build a Flink SQL development platform using Apache Zeppelin, providing a complete step‑by‑step solution that avoids writing Java/Scala code and leverages the familiar SQL language.

Environment preparation : Install Zeppelin 0.9.0‑SNAPSHOT and Flink 1.10+, then unpack Zeppelin, rename the configuration template, and set JAVA_HOME and ZEPPELIN_ADDR in zeppelin-env.sh.

#1.1 unzip
 tar -zxvf zeppelin-0.9.0-SNAPSHOT.tar.gz
#1.2 enter conf directory
 cd zeppelin-0.9.0-SNAPSHOT/conf
#1.3 rename config file
 mv zeppelin-env.sh.template zeppelin-env.sh
#1.4 edit config
 vim zeppelin-env.sh
# add two lines
 export JAVA_HOME=/path/to/jdk
 export ZEPPELIN_ADDR=your.ip.address

If Flink runs on YARN, add the required Hadoop‑compatible JARs to ~/flink/lib (e.g., flink-hadoop-compatibility_2.11-1.10.0.jar, flink-shaded-hadoop-2-uber-2.7.5-9.0.jar) and the Hive connector JARs.

Start Zeppelin with ./zeppelin-daemon.sh start and verify the log shows Zeppelin start [ OK ]. Access the UI at http://<em>host</em>:8080.

Interpreter configuration : In the Zeppelin UI, go to the Interpreter page, select the Flink interpreter, and set FLINK_HOME and flink.execution.mode (Local, Remote, or Yarn). For Yarn mode also configure HADOOP_CONF_DIR.

Verification : Open a demo notebook, run a simple Scala WordCount batch job, and confirm the result appears both in Zeppelin and on the Flink/YARN Web UI.

Common errors :

Java version too low – update JAVA_HOME to JDK 8 u151+ or modify zeppelin/bin/common.sh to point to a newer JDK.

Network unreachable – inject ZEPPELIN_LOCAL_IP into the environment and restart.

Dimension‑table join (Dim‑Join) : Register a File System source table and a Kafka sink table using Flink SQL, then join with a MySQL dimension table. Example DDL:

%flink.ssql
-- File System Source DDL
DROP TABLE IF EXISTS t1;
CREATE TABLE t1 (
  user_id BIGINT,
  item_id BIGINT,
  category_id BIGINT,
  behavior VARCHAR,
  ts BIGINT
) WITH (
  'connector.type' = 'filesystem',
  'connector.path' = 'hdfs:///test/UserBehavior.csv',
  'format.type' = 'csv',
  'format.field-delimiter' = ','
);

-- Kafka Sink DDL
DROP TABLE IF EXISTS t2;
CREATE TABLE t2 (
  user_id BIGINT,
  item_id BIGINT,
  category_id BIGINT,
  behavior STRING,
  ts BIGINT
) WITH (
  'update-mode' = 'append',
  'connector.type' = 'kafka',
  'connector.version' = 'universal',
  'connector.topic' = 'zeppelin_01_test',
  'connector.properties.zookeeper.connect' = '127.0.0.1:2181',
  'connector.properties.bootstrap.servers' = '127.0.0.1:9092',
  'format.type' = 'json'
);

Insert data into Kafka and query the joined result, demonstrating continuous updates.

Custom UDF registration : Two ways to register UDFs – via tEnv.registerFunction or CREATE FUNCTION. Zeppelin can automatically load all UDFs from a JAR using flink.udf.jars:

%flink.conf
flink.udf.jars /home/data/flink/lib_me/flink-udf-1.0-SNAPSHOT.jar

After loading, show functions lists the available UDFs, and they can be used directly, e.g., select javaupper('a').

Redis dimension table : Load a Redis connector JAR, define a Redis table, and join it with the streaming table, applying the custom UDF to convert values to uppercase.

%flink.ssql
-- Redis Dim DDL
DROP TABLE IF EXISTS redis_dim;
CREATE TABLE redis_dim (
  first STRING,
  name STRING
) WITH (
  'connector.type' = 'redis',
  'connector.ip' = '127.0.0.1',
  'connector.port' = '6379',
  'connector.lookup.cache.max-rows' = '10',
  'connector.lookup.cache.ttl' = '10000000',
  'connector.version' = '2.6'
);

%flink.ssql(type=update)
SELECT a.*, javaupper(b.name)
FROM t3 a LEFT JOIN redis_dim FOR SYSTEM_TIME AS OF a.p AS b
ON a.behavior = b.first
WHERE b.name IS NOT NULL AND b.name <> '';

Dual‑stream join : Demonstrates both Unbounded Join and Time‑Interval Join. Shows how state can grow indefinitely with Unbounded Join, while Time‑Interval Join automatically evicts stale keys based on event‑time or processing‑time windows.

Finally, the article notes that Flink on Zeppelin also supports Hive streaming, which is covered in a separate guide.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

FlinkSQLredisStreamingUDFZeppelin
Big Data Technology & Architecture
Written by

Big Data Technology & Architecture

Wang Zhiwu, a big data expert, dedicated to sharing big data technology.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.