Databases 17 min read

Apache Doris Installation, Cluster Deployment, Operations Manual, and Integration with Spark & Flink

This guide provides step‑by‑step instructions for downloading Apache Doris, configuring and deploying FE, BE, and Broker nodes, performing scaling operations, managing users and tables, importing and exporting data, and integrating Doris with Spark and Flink using code examples.

Big Data Technology & Architecture
Big Data Technology & Architecture
Big Data Technology & Architecture
Apache Doris Installation, Cluster Deployment, Operations Manual, and Integration with Spark & Flink

Doris Installation

Download the binary package from the official website ( https://doris.apache.org/zh-CN/downloads/downloads.html ) and extract it to /opt/module/:

tar -zxvf apache-doris-1.0.0-incubating-bin.tar.gz -C /opt/module/

Cluster Planning

FE Deployment

Edit conf/fe.conf to set the meta directory: meta_dir = /opt/module/doris-meta Create the meta directory on each node and start the FE service:

# Create meta folder
mkdir /opt/module/doris-meta
# Run on all three machines
sh bin/start_fe.sh --daemon

BE Deployment

Edit conf/be.conf to configure storage roots (multiple paths can be separated by ';'):

# storage_root_path can specify multiple directories, each followed by a comma and size (GB)
storage_root_path = /opt/module/doris_storage1,10;/opt/module/doris_storage2

Create the storage directories on each node and start the BE service:

# Create storage paths
mkdir /opt/module/doris_storage1
mkdir /opt/module/doris_storage2
# Run on all three machines
sh bin/start_be.sh --daemon

Broker Deployment (Optional)

Start the broker on all nodes:

# Start broker on each machine
sh bin/start_broker.sh --daemon

Accessing Doris

Use the MySQL client to connect to the FE node (default port 9030). The first login has no password; you can set one later:

# Connect without password
mysql -h doris1 -P 9030 -u root
# Set password
set password for 'root' = password('root');

Scaling FE/BE/Broker

FE nodes can be added as Leader, Follower, or Observer. Use SQL commands to add or drop nodes, e.g.:

ALTER SYSTEM ADD FOLLOWER "hadoop103:9010";
ALTER SYSTEM ADD OBSERVER "hadoop104:9010";
ALTER SYSTEM DROP FOLLOWER "hadoop103:9010";

BE nodes are added or removed with:

ALTER SYSTEM ADD BACKEND "hadoop102:9050";
ALTER SYSTEM DROP BACKEND "hadoop102:9050";

Doris Operations Manual

User Management

# Connect to Doris
mysql -hhadoop102 -P 9030 -uroot
# Create a user
create user 'test' identified by 'test';
exit;
# Login with the new user
mysql -hhadoop102 -P 9030 -utest -ptest

Table Operations

# Create a database
create database test_db;
# Grant privileges
grant all on test_db.* to test;
# Use the database
use test_db;

Single‑Partition Table

CREATE TABLE student (
  id INT,
  name VARCHAR(50),
  age INT,
  count BIGINT SUM DEFAULT '0'
) AGGREGATE KEY (id,name,age)
DISTRIBUTED BY HASH(id) BUCKETS 10
PROPERTIES ("replication_num" = "1");

Range‑Partitioned Table

CREATE TABLE student2 (
  dt DATE,
  id INT,
  name VARCHAR(50),
  age INT,
  count BIGINT SUM DEFAULT '0'
) AGGREGATE KEY (dt,id,name,age)
PARTITION BY RANGE(dt) (
  PARTITION p202007 VALUES LESS THAN ('2020-08-01'),
  PARTITION p202008 VALUES LESS THAN ('2020-09-01'),
  PARTITION p202009 VALUES LESS THAN ('2020-10-01')
)
DISTRIBUTED BY HASH(id) BUCKETS 10
PROPERTIES ("replication_num" = "1");

Data Model Overview

AGGREGATE KEY : rows with the same key are aggregated.

UNIQUE KEY : newer rows replace older ones.

DUPLICATE KEY : rows are stored as‑is without aggregation.

Data Import Methods

Broker Load (Asynchronous)

Submit a load job; FE creates a plan and distributes it to BE nodes, which pull data from the broker.

LOAD LABEL test_db.label1 (
  DATA INFILE("hdfs://bigdata:8020/student")
  INTO TABLE student
  COLUMNS TERMINATED BY ","
  (id,name,age,count)
  SET (id=id, name=name, age=age, count=count)
) WITH BROKER broker_name (
  "username"="root"
) PROPERTIES ("timeout" = "3600");

Routine Load (Streaming)

Continuously ingest data from Kafka.

# Create Kafka topic
kafka-topics.sh --zookeeper bigdata:2181 --create --replication-factor 1 --partitions 1 --topic test
# Produce JSON records
{kafka JSON example}
# Create target table in Doris
CREATE TABLE kafka_student (
  id INT,
  name VARCHAR(50),
  age INT,
  count BIGINT SUM DEFAULT '0'
) AGGREGATE KEY (id,name,age)
DISTRIBUTED BY HASH(id) BUCKETS 10
PROPERTIES ("replication_num" = "1");
# Create routine load job
CREATE ROUTINE LOAD test_db.job1 ON kafka_student
PROPERTIES (
  "desired_concurrent_number"="1",
  "strict_mode"="false",
  "format"="json"
)
FROM KAFKA (
  "kafka_broker_list"="bigdata:9092",
  "kafka_topic"="test",
  "property.group.id"="test"
);

Data Export

EXPORT TABLE test_db.student
PARTITION (student)
TO "hdfs://bigdata:8020/doris/student/"
WITH BROKER broker_name (
  "username" = "root"
);

Code Integration

Spark

Add Maven dependencies for Spark, MySQL connector, Hive, Kafka, etc. (see original XML snippet).

Read Doris data via JDBC:

object ReadDoris {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("testReadDoris").setMaster("local[*]")
    val spark = SparkSession.builder().config(sparkConf).getOrCreate()
    val df = spark.read.format("jdbc")
      .option("url", "jdbc:mysql://bigdata:9030/test_db")
      .option("user", "root")
      .option("password", "root")
      .option("dbtable", "student")
      .load()
    df.show()
    spark.close()
  }
}

Flink

Add Maven dependencies for Flink, Kafka, Elasticsearch, Redis, etc. (see original XML snippet).

Read Doris data using the JDBC connector:

public static void main(String[] args) {
    EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
    TableEnvironment tEnv = TableEnvironment.create(settings);
    String sourceSql = "CREATE TABLE student (
" +
        "`id` Integer,
" +
        "`name` STRING,
" +
        "`age` Integer
) WITH (
" +
        "'connector'='jdbc',
" +
        "'url' = 'jdbc:mysql://bigdata:9030/test_db',
" +
        "'username'='root',
" +
        "'password'='root',
" +
        "'table-name'='student'
);";
    tEnv.executeSql(sourceSql);
    Table table = tEnv.sqlQuery("select * from student");
    table.execute().print();
}

The guide also includes numerous screenshots (preserved as <img> tags in the original source) illustrating each step.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

cluster scalingApache Dorisdata importDatabase DeploymentFlink IntegrationSpark Integration
Big Data Technology & Architecture
Written by

Big Data Technology & Architecture

Wang Zhiwu, a big data expert, dedicated to sharing big data technology.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.