How to Build a High‑Performance Doris Stream Load Service in Java

This guide walks through the complete architecture, Maven dependencies, configuration classes, annotation‑driven field mapping, utility mappers, a parallel Stream Load core, response handling, and performance tuning for integrating Apache Doris with a Spring Boot backend.

Selected Java Interview Questions
Selected Java Interview Questions
Selected Java Interview Questions
How to Build a High‑Performance Doris Stream Load Service in Java

Overall Architecture

├── src
│   ├── main
│   │   ├── java
│   │   │   └── com
│   │   │       └── example
│   │   │           └── doris
│   │   │               ├── annotation
│   │   │               │   └── DorisField.java
│   │   │               ├── config
│   │   │               │   ├── DorisConfig.java
│   │   │               │   └── DorisStreamLoadProperties.java
│   │   │               ├── core
│   │   │               │   └── DorisStreamLoader.java
│   │   │               ├── entity
│   │   │               │   └── User.java
│   │   │               ├── service
│   │   │               │   └── UserService.java
│   │   │               └── util
│   │   │                   └── DorisMapper.java
│   │   └── resources
│   │       └── application.yml

1. Dependency Configuration (pom.xml)

<dependencies>
    <!-- Spring Boot Starter -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <!-- HTTP client -->
    <dependency>
        <groupId>org.apache.httpcomponents</groupId>
        <artifactId>httpclient</artifactId>
        <version>4.5.13</version>
    </dependency>
    <!-- Jackson -->
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
    </dependency>
    <dependency>
        <groupId>com.fasterxml.jackson.datatype</groupId>
        <artifactId>jackson-datatype-jsr310</artifactId>
    </dependency>
    <!-- Lombok -->
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
    <!-- Guava -->
    <dependency>
        <groupId>com.google.guava</groupId>
        <artifactId>guava</artifactId>
        <version>31.1-jre</version>
    </dependency>
    <!-- Reflections (runtime class scanning) -->
    <dependency>
        <groupId>org.reflections</groupId>
        <artifactId>reflections</artifactId>
        <version>0.10.2</version>
    </dependency>
</dependencies>

2. Configuration Properties Class

DorisStreamLoadProperties.java

package com.example.doris.config;

import lombok.Getter;
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

@Getter
@Setter
@Configuration
@ConfigurationProperties(prefix = "doris.stream-load")
public class DorisStreamLoadProperties {
    private String url;
    private String username;
    private String password;
    private int connectTimeout = 30000;
    private int socketTimeout = 60000;
    private int batchSize = 50000;
    private int maxRetries = 3;
    private String compression = "none";
    private int maxParallel = 4;
    private String columnSeparator = "\\x01";
    private String lineSeparator = "\
";
}

3. Field Mapping Annotation

DorisField.java

package com.example.doris.annotation;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.FIELD)
public @interface DorisField {
    /** Doris table column name (defaults to field name) */
    String value() default "";
    /** Field order – smaller numbers appear first */
    int order() default Integer.MAX_VALUE;
    /** Whether to ignore this field */
    boolean ignore() default false;
}

4. Automatic Mapping Utility

DorisMapper.java

package com.example.doris.util;

import cn.hutool.core.date.DateUtil;
import com.example.doris.annotation.DorisField;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.lang.reflect.Field;
import java.util.*;
import java.util.stream.Collectors;

public class DorisMapper {
    private static final ObjectMapper objectMapper = new ObjectMapper();

    /** Convert entity list to a list of ordered maps for Doris */
    public static <T> List<Map<String, Object>> convertToDorisData(List<T> entities, Map<String, Object> additionalFields) {
        if (entities == null || entities.isEmpty()) {
            return Collections.emptyList();
        }
        Class<?> clazz = entities.get(0).getClass();
        List<Field> orderedFields = getOrderedFields(clazz);
        return entities.stream().map(entity -> {
            Map<String, Object> map = new LinkedHashMap<>();
            if (additionalFields != null) {
                map.putAll(additionalFields);
            }
            for (Field field : orderedFields) {
                try {
                    field.setAccessible(true);
                    DorisField annotation = field.getAnnotation(DorisField.class);
                    String fieldName = annotation.value().isEmpty() ? field.getName() : annotation.value();
                    Object value = field.get(entity);
                    // Format Date fields as "yyyy-MM-dd HH:mm:ss"
                    map.put(fieldName, field.getType() == Date.class ? DateUtil.format((Date) value, "yyyy-MM-dd HH:mm:ss") : value);
                } catch (IllegalAccessException e) {
                    throw new RuntimeException("Field access failed: " + field.getName(), e);
                }
            }
            return map;
        }).collect(Collectors.toList());
    }

    /** Convert entity list to Doris data using Jackson serialization */
    public static <T> List<Map<String, Object>> convertWithJackson(List<T> entities, Map<String, Object> additionalFields) {
        return entities.stream().map(entity -> {
            Map<String, Object> map = objectMapper.convertValue(entity, new TypeReference<LinkedHashMap<String, Object>>() {});
            if (additionalFields != null) {
                map.putAll(additionalFields);
            }
            return map;
        }).collect(Collectors.toList());
    }

    /** Get fields annotated with @DorisField, sorted by order and not ignored */
    private static List<Field> getOrderedFields(Class<?> clazz) {
        return Arrays.stream(clazz.getDeclaredFields())
                .filter(f -> f.isAnnotationPresent(DorisField.class))
                .filter(f -> !f.getAnnotation(DorisField.class).ignore())
                .sorted(Comparator.comparingInt(f -> f.getAnnotation(DorisField.class).order()))
                .collect(Collectors.toList());
    }
}

5. Doris Stream Load Core Class

DorisStreamLoader.java

package com.example.doris.core;

import com.alibaba.fastjson.JSONObject;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.Header;
import org.apache.http.HttpEntity;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.entity.HttpEntityWrapper;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.message.BasicHeader;
import org.apache.http.util.EntityUtils;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
import java.util.stream.Collectors;
import java.util.zip.GZIPOutputStream;

@Slf4j
@Component
public class DorisStreamLoader {
    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
    private final DorisStreamLoadProperties properties;
    private final String authEncoding;
    private final CloseableHttpClient httpClient;
    private final ExecutorService executorService;

    public DorisStreamLoader(@Qualifier("dorisStreamLoadProperties") DorisStreamLoadProperties properties) {
        this.properties = properties;
        this.authEncoding = Base64.getEncoder().encodeToString((properties.getUsername() + ":" + properties.getPassword()).getBytes(StandardCharsets.UTF_8));
        this.httpClient = HttpClients.custom()
                .setConnectionTimeToLive(60, TimeUnit.SECONDS)
                .setMaxConnTotal(properties.getMaxParallel() * 2)
                .setMaxConnPerRoute(properties.getMaxParallel())
                .build();
        this.executorService = Executors.newFixedThreadPool(properties.getMaxParallel());
    }

    /** Parallel stream load */
    public void parallelStreamLoad(List<Map<String, Object>> data, String format, String dbName, String tableName) {
        if (data.isEmpty()) return;
        List<List<Map<String, Object>>> batches = Lists.partition(data, properties.getBatchSize());
        CompletableFuture.allOf(batches.stream()
                .map(batch -> CompletableFuture.runAsync(() -> {
                    try {
                        sendBatch(batch, format, dbName, tableName);
                    } catch (Exception e) {
                        log.error("Stream Load batch failed", e);
                        throw new RuntimeException(e);
                    }
                }, executorService)).toArray(CompletableFuture[]::new)).join();
    }

    private void sendBatch(List<Map<String, Object>> batch, String format, String dbName, String tableName) {
        String payload = "csv".equalsIgnoreCase(format) ? convertToCsv(batch) : convertToJson(batch);
        attemptSendWithRetry(payload, format, 0, dbName, tableName);
    }

    private void attemptSendWithRetry(String data, String format, int retryCount, String dbName, String tableName) {
        try {
            sendToDoris(data, format, dbName, tableName);
            log.info("Stream Load succeeded");
        } catch (Exception e) {
            if (retryCount < properties.getMaxRetries()) {
                long delay = 1000L * (long) Math.pow(2, retryCount);
                log.warn("Stream Load failed, retry {} after {} ms", retryCount + 1, delay, e);
                scheduler.schedule(() -> attemptSendWithRetry(data, format, retryCount + 1, dbName, tableName), delay, TimeUnit.MILLISECONDS);
            } else {
                log.error("Stream Load reached max retries", e);
                throw new RuntimeException("Stream Load batch failed: " + e.getMessage(), e);
            }
        }
    }

    private String convertToJson(List<Map<String, Object>> batch) {
        try {
            return new ObjectMapper().writeValueAsString(batch);
        } catch (JsonProcessingException e) {
            throw new RuntimeException("JSON conversion failed", e);
        }
    }

    private String convertToCsv(List<Map<String, Object>> batch) {
        return batch.stream()
                .map(row -> row.values().stream()
                        .map(v -> v == null ? "\\N" : v.toString())
                        .collect(Collectors.joining(properties.getColumnSeparator())))
                .collect(Collectors.joining(properties.getLineSeparator()));
    }

    private void sendToDoris(String data, String format, String dbName, String tableName) throws IOException {
        HttpPut httpPut = new HttpPut(String.format(properties.getUrl(), dbName, tableName));
        httpPut.setHeader("Authorization", "Basic " + authEncoding);
        RequestConfig config = RequestConfig.custom()
                .setConnectTimeout(properties.getConnectTimeout())
                .setSocketTimeout(properties.getSocketTimeout())
                .build();
        httpPut.setConfig(config);
        if ("csv".equalsIgnoreCase(format)) {
            httpPut.setHeader("Content-Type", "text/plain");
            httpPut.setHeader("format", "csv");
            httpPut.setHeader("column_separator", properties.getColumnSeparator());
            httpPut.setHeader("line_delimiter", properties.getLineSeparator());
        } else {
            httpPut.setHeader("Content-Type", "application/json");
            httpPut.setHeader("format", "json");
            httpPut.setHeader("strip_outer_array", "true");
        }
        HttpEntity entity;
        if ("gzip".equals(properties.getCompression())) {
            httpPut.setHeader("compress_type", "gz");
            entity = new GzipCompressingEntity(data);
        } else {
            entity = new StringEntity(data, StandardCharsets.UTF_8);
        }
        httpPut.setEntity(entity);
        try (CloseableHttpResponse response = httpClient.execute(httpPut)) {
            int statusCode = response.getStatusLine().getStatusCode();
            LoadResponse result = JSONObject.parseObject(EntityUtils.toString(response.getEntity()), LoadResponse.class);
            if (!"Success".equals(result.getStatus())) {
                log.error("Stream Load failed: HTTP {} - {}", statusCode, result);
                throw new IOException("Doris Stream Load failed: " + result);
            }
            log.info("Imported {} rows, time {} ms", result.getNumberLoadedRows(), result.getLoadTimeMs());
        }
    }

    /** GZIP compressing entity */
    static class GzipCompressingEntity extends HttpEntityWrapper {
        public GzipCompressingEntity(String data) throws IOException {
            super(createEntity(data));
        }
        private static HttpEntity createEntity(String data) throws IOException {
            ByteArrayOutputStream bos = new ByteArrayOutputStream();
            try (GZIPOutputStream gzip = new GZIPOutputStream(bos)) {
                gzip.write(data.getBytes(StandardCharsets.UTF_8));
            }
            return new ByteArrayEntity(bos.toByteArray());
        }
        @Override
        public Header getContentEncoding() {
            return new BasicHeader("Content-Encoding", "gzip");
        }
    }
}

6. Response Class

LoadResponse.java

package com.example.doris.application.config;

import lombok.Data;

@Data
public class LoadResponse {
    private long TxnId;
    private String Label;
    private String Comment;
    private boolean TwoPhaseCommit;
    private String Status;
    private String Message;
    private int NumberTotalRows;
    private int NumberLoadedRows;
    private int NumberFilteredRows;
    private int NumberUnselectedRows;
    private int LoadBytes;
    private int LoadTimeMs;
    private int BeginTxnTimeMs;
    private int StreamLoadPutTimeMs;
    private int ReadDataTimeMs;
    private int WriteDataTimeMs;
    private int ReceiveDataTimeMs;
    private int CommitAndPublishTimeMs;
}

7. Table Name Utility

TableUtils.java

package com.example.doris.util;

import com.baomidou.mybatisplus.annotation.TableName;
import java.util.concurrent.ConcurrentHashMap;
import java.util.Map;
import java.util.List;
import java.util.ArrayList;

public class TableUtils {
    public static <T> String getTableName(Class<T> entityClass) {
        TableName annotation = entityClass.getAnnotation(TableName.class);
        if (annotation != null) {
            return annotation.value();
        }
        // Default: camelCase to snake_case
        return entityClass.getSimpleName()
                .replaceAll("([A-Z])", "_$1")
                .toLowerCase()
                .substring(1);
    }
}

8. Entity Example

User.java

package com.example.doris.entity;

import com.baomidou.mybatisplus.annotation.TableName;
import com.example.doris.annotation.DorisField;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.time.LocalDateTime;

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@TableName("user")
public class User {
    @DorisField(order = 1)
    private Long id;

    @DorisField(value = "user_name", order = 2)
    private String name;

    @DorisField(order = 3)
    private Integer age;

    @DorisField(value = "account_status", order = 4)
    private Integer status;

    @DorisField(ignore = true)
    private String password; // ignored

    private String email; // no annotation, ignored

    @DorisField(order = 5)
    private LocalDateTime createTime;
}

9. Service Layer Implementation

UserService.java

package com.example.doris.service;

import com.example.doris.core.DorisStreamLoader;
import com.example.doris.entity.User;
import com.example.doris.util.DorisMapper;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

@Service
@RequiredArgsConstructor
public class UserService {
    private final DorisStreamLoader dorisStreamLoader;

    /** Batch update user status */
    public void batchUpdateUserStatus(List<User> users) {
        Map<String, Object> additionalFields = new HashMap<>();
        additionalFields.put("update_time", LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME));
        List<Map<String, Object>> dorisData = DorisMapper.convertToDorisData(users, additionalFields);
        dorisStreamLoader.parallelStreamLoad(dorisData, "json", UserProvider.getUser().getTenantDbConnectionString(), TableUtils.getTableName(User.class));
    }

    /** Batch create users */
    public void batchCreateUsers(List<User> users) {
        Map<String, Object> additionalFields = new HashMap<>();
        additionalFields.put("create_time", LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME));
        List<Map<String, Object>> dorisData = DorisMapper.convertToDorisData(users, additionalFields);
        dorisStreamLoader.parallelStreamLoad(dorisData, "json", UserProvider.getUser().getTenantDbConnectionString(), TableUtils.getTableName(User.class));
    }
}

10. Configuration Class

DorisConfig.java

package com.example.doris.config;

import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableConfigurationProperties(DorisStreamLoadProperties.class)
public class DorisConfig {
    @Bean
    public DorisStreamLoader dorisStreamLoader(@Qualifier("dorisStreamLoadProperties") DorisStreamLoadProperties properties) {
        return new DorisStreamLoader(properties);
    }
}

11. Application YAML

doris:
  stream-load:
    url: http://doris-fe:8030/api/user_db/user_table/_stream_load
    username: admin
    password: "secure_password"
    connect-timeout: 60000   # 60 seconds
    socket-timeout: 300000   # 5 minutes
    batch-size: 100000       # 100k rows per batch
    max-retries: 5
    compression: gzip
    max-parallel: 8         # 8 threads

spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8

Solution Features & Advantages

Automatic field mapping via @DorisField supports renaming, ordering, and ignoring fields.

High‑performance processing : batch size 100k, 8‑thread parallelism, optional GZIP compression, exponential back‑off retries.

Smart cache for reflected fields to avoid repeated reflection.

Result monitoring with detailed LoadResponse logging.

Performance Optimization Suggestions

Adjust batch size according to cluster capacity, e.g., batch-size: 50000.

Increase parallelism, e.g., max-parallel: ${CPU_CORES*2}.

Enable GZIP compression to reduce network traffic.

Increase timeouts for large data loads: connect-timeout: 120000, socket-timeout: 600000.

Increase Doris BE memory and parallel task limits via SQL:

SET GLOBAL streaming_load_max_mb = 4096;
SET GLOBAL max_running_txn_num_per_db = 1024;
Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

JavaSpring BootData IntegrationdorisStream Load
Selected Java Interview Questions
Written by

Selected Java Interview Questions

A professional Java tech channel sharing common knowledge to help developers fill gaps. Follow us!

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.