Databases 23 min read

Boost Your Data Ingestion: A High‑Performance Java Stream Load Architecture for Doris

This article presents a complete Java‑based architecture for high‑throughput Doris stream loading, covering project structure, Maven dependencies, configuration properties, field‑mapping annotations, automatic mapper utilities, a robust parallel loader with retry and compression, plus performance tuning recommendations.

Code Ape Tech Column
Code Ape Tech Column
Code Ape Tech Column
Boost Your Data Ingestion: A High‑Performance Java Stream Load Architecture for Doris

Overall Architecture

├── src
│   ├── main
│   │   ├── java
│   │   │   └── com
│   │   │       └── example
│   │   │           └── doris
│   │   │               ├── annotation
│   │   │               │   └── DorisField.java
│   │   │               ├── config
│   │   │               │   ├── DorisConfig.java
│   │   │               │   └── DorisStreamLoadProperties.java
│   │   │               ├── core
│   │   │               │   └── DorisStreamLoader.java
│   │   │               ├── entity
│   │   │               │   └── User.java
│   │   │               ├── service
│   │   │               │   └── UserService.java
│   │   │               └── util
│   │   │                   └── DorisMapper.java
│   │   └── resources
│   │       └── application.yml

1. Dependency Configuration (pom.xml)

<dependencies>
    <!-- Spring Boot Starter -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <!-- HTTP client -->
    <dependency>
        <groupId>org.apache.httpcomponents</groupId>
        <artifactId>httpclient</artifactId>
        <version>4.5.13</version>
    </dependency>
    <!-- Jackson -->
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
    </dependency>
    <dependency>
        <groupId>com.fasterxml.jackson.datatype</groupId>
        <artifactId>jackson-datatype-jsr310</artifactId>
    </dependency>
    <!-- Lombok -->
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
    <!-- Guava -->
    <dependency>
        <groupId>com.google.guava</groupId>
        <artifactId>guava</artifactId>
        <version>31.1-jre</version>
    </dependency>
    <!-- Reflections (runtime annotation scanning) -->
    <dependency>
        <groupId>org.reflections</groupId>
        <artifactId>reflections</artifactId>
        <version>0.10.2</version>
    </dependency>
</dependencies>

2. Configuration Properties Class

package com.example.doris.config;

import lombok.Getter;
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

@Getter
@Setter
@Configuration
@ConfigurationProperties(prefix = "doris.stream-load")
public class DorisStreamLoadProperties {
    private String url;
    private String username;
    private String password;
    private int connectTimeout = 30000;
    private int socketTimeout = 60000;
    private int batchSize = 50000;
    private int maxRetries = 3;
    private String compression = "none";
    private int maxParallel = 4;
    private String columnSeparator = "\\x01";
    private String lineSeparator = "\
";
}

3. Field‑Mapping Annotation

package com.example.doris.annotation;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.FIELD)
public @interface DorisField {
    /** Doris table column name (defaults to field name) */
    String value() default "";
    /** Order of the column – smaller values appear first */
    int order() default Integer.MAX_VALUE;
    /** Whether to ignore this field */
    boolean ignore() default false;
}

4. Automatic Mapping Utility

package com.example.doris.util;

import cn.hutool.core.date.DateUtil;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.lang.reflect.Field;
import java.util.*;
import java.util.stream.Collectors;

public class DorisMapper {
    private static final ObjectMapper objectMapper = new ObjectMapper();

    /** Convert a list of entities to a list of ordered maps for Doris */
    public static <T> List<Map<String, Object>> convertToDorisData(List<T> entities, Map<String, Object> additionalFields) {
        if (entities == null || entities.isEmpty()) {
            return Collections.emptyList();
        }
        Class<?> clazz = entities.get(0).getClass();
        List<Field> orderedFields = getOrderedFields(clazz);
        return entities.stream().map(entity -> {
            Map<String, Object> map = new LinkedHashMap<>();
            if (additionalFields != null) {
                map.putAll(additionalFields);
            }
            for (Field field : orderedFields) {
                try {
                    field.setAccessible(true);
                    DorisField annotation = field.getAnnotation(DorisField.class);
                    String fieldName = annotation.value().isEmpty() ? field.getName() : annotation.value();
                    Object value = field.get(entity);
                    map.put(fieldName, field.getType() == Date.class ? DateUtil.format((Date) value, "yyyy-MM-dd HH:mm:ss") : value);
                } catch (IllegalAccessException e) {
                    throw new RuntimeException("Field access failed: " + field.getName(), e);
                }
            }
            return map;
        }).collect(Collectors.toList());
    }

    /** Same conversion using Jackson serialization */
    public static <T> List<Map<String, Object>> convertWithJackson(List<T> entities, Map<String, Object> additionalFields) {
        return entities.stream().map(entity -> {
            Map<String, Object> map = objectMapper.convertValue(entity, new TypeReference<LinkedHashMap<String, Object>>() {});
            if (additionalFields != null) {
                map.putAll(additionalFields);
            }
            return map;
        }).collect(Collectors.toList());
    }

    private static List<Field> getOrderedFields(Class<?> clazz) {
        return Arrays.stream(clazz.getDeclaredFields())
                .filter(f -> f.isAnnotationPresent(DorisField.class))
                .filter(f -> !f.getAnnotation(DorisField.class).ignore())
                .sorted(Comparator.comparingInt(f -> f.getAnnotation(DorisField.class).order()))
                .collect(Collectors.toList());
    }
}

5. Doris Stream Load Core Class

package com.example.doris.core;

import com.alibaba.fastjson.JSONObject;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.Header;
import org.apache.http.HttpEntity;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.entity.HttpEntityWrapper;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.message.BasicHeader;
import org.apache.http.util.EntityUtils;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.Collectors;
import java.util.zip.GZIPOutputStream;

@Slf4j
@Component
public class DorisStreamLoader {
    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
    private final DorisStreamLoadProperties properties;
    private final String authEncoding;
    private final CloseableHttpClient httpClient;
    private final ExecutorService executorService;

    public DorisStreamLoader(@Qualifier("dorisStreamLoadProperties") DorisStreamLoadProperties properties) {
        this.properties = properties;
        this.authEncoding = Base64.getEncoder().encodeToString((properties.getUsername() + ":" + properties.getPassword()).getBytes(StandardCharsets.UTF_8));
        this.httpClient = HttpClients.custom()
                .setConnectionTimeToLive(60, TimeUnit.SECONDS)
                .setMaxConnTotal(properties.getMaxParallel() * 2)
                .setMaxConnPerRoute(properties.getMaxParallel())
                .build();
        this.executorService = Executors.newFixedThreadPool(properties.getMaxParallel());
    }

    /** Parallel stream load */
    public void parallelStreamLoad(List<Map<String, Object>> data, String format, String dbName, String tableName) {
        if (data.isEmpty()) return;
        List<List<Map<String, Object>>> batches = Lists.partition(data, properties.getBatchSize());
        CompletableFuture.allOf(batches.stream()
                .map(batch -> CompletableFuture.runAsync(() -> {
                    try {
                        sendBatch(batch, format, dbName, tableName);
                    } catch (Exception e) {
                        log.error("Stream Load batch failed", e);
                        throw new RuntimeException(e);
                    }
                }, executorService)).toArray(CompletableFuture[]::new)).join();
    }

    private void sendBatch(List<Map<String, Object>> batch, String format, String dbName, String tableName) {
        String data = "csv".equalsIgnoreCase(format) ? convertToCsv(batch) : convertToJson(batch);
        attemptSendWithRetry(data, format, 0, dbName, tableName);
    }

    private void attemptSendWithRetry(String data, String format, int retryCount, String dbName, String tableName) {
        try {
            sendToDoris(data, format, dbName, tableName);
            log.info("Stream Load succeeded");
        } catch (Exception e) {
            if (retryCount < properties.getMaxRetries()) {
                long delay = 1000L * (long) Math.pow(2, retryCount);
                log.warn("Stream Load failed, retry {} after {}ms", retryCount + 1, delay, e);
                scheduler.schedule(() -> attemptSendWithRetry(data, format, retryCount + 1, dbName, tableName), delay, TimeUnit.MILLISECONDS);
            } else {
                log.error("Stream Load reached max retries", e);
                throw new RuntimeException("Stream Load batch failed: " + e.getMessage(), e);
            }
        }
    }

    private String convertToJson(List<Map<String, Object>> batch) {
        try {
            return new ObjectMapper().writeValueAsString(batch);
        } catch (JsonProcessingException e) {
            throw new RuntimeException("JSON conversion failed", e);
        }
    }

    private String convertToCsv(List<Map<String, Object>> batch) {
        return batch.stream()
                .map(row -> row.values().stream()
                        .map(v -> v == null ? "\\N" : v.toString())
                        .collect(Collectors.joining(properties.getColumnSeparator())))
                .collect(Collectors.joining(properties.getLineSeparator()));
    }

    private void sendToDoris(String data, String format, String dbName, String tableName) throws IOException {
        HttpPut httpPut = new HttpPut(String.format(properties.getUrl(), dbName, tableName));
        httpPut.setHeader("Authorization", "Basic " + authEncoding);
        RequestConfig config = RequestConfig.custom()
                .setConnectTimeout(properties.getConnectTimeout())
                .setSocketTimeout(properties.getSocketTimeout())
                .build();
        httpPut.setConfig(config);
        if ("csv".equalsIgnoreCase(format)) {
            httpPut.setHeader("Content-Type", "text/plain");
            httpPut.setHeader("format", "csv");
            httpPut.setHeader("column_separator", properties.getColumnSeparator());
            httpPut.setHeader("line_delimiter", properties.getLineSeparator());
        } else {
            httpPut.setHeader("Content-Type", "application/json");
            httpPut.setHeader("format", "json");
            httpPut.setHeader("strip_outer_array", "true");
        }
        HttpEntity entity;
        if ("gzip".equals(properties.getCompression())) {
            httpPut.setHeader("compress_type", "gz");
            entity = new GzipCompressingEntity(data);
        } else {
            entity = new StringEntity(data, StandardCharsets.UTF_8);
        }
        httpPut.setEntity(entity);
        try (CloseableHttpResponse response = httpClient.execute(httpPut)) {
            int statusCode = response.getStatusLine().getStatusCode();
            LoadResponse result = JSONObject.parseObject(EntityUtils.toString(response.getEntity()), LoadResponse.class);
            if (!"Success".equals(result.getStatus())) {
                log.error("Stream Load failed: HTTP {} - {}", statusCode, result);
                throw new IOException("Doris Stream Load failed: " + result);
            }
            log.info("Loaded {} rows in {} ms", result.getNumberLoadedRows(), result.getLoadTimeMs());
        }
    }

    /** GZIP compressing entity */
    static class GzipCompressingEntity extends HttpEntityWrapper {
        public GzipCompressingEntity(String data) throws IOException {
            super(createEntity(data));
        }
        private static HttpEntity createEntity(String data) throws IOException {
            ByteArrayOutputStream bos = new ByteArrayOutputStream();
            try (GZIPOutputStream gzip = new GZIPOutputStream(bos)) {
                gzip.write(data.getBytes(StandardCharsets.UTF_8));
            }
            return new ByteArrayEntity(bos.toByteArray());
        }
        @Override
        public Header getContentEncoding() {
            return new BasicHeader("Content-Encoding", "gzip");
        }
    }
}

6. API Response Class (LoadResponse)

package com.fantaibao.application.config;

import lombok.Data;

@Data
public class LoadResponse {
    private long TxnId;
    private String Label;
    private String Comment;
    private boolean TwoPhaseCommit;
    private String Status;
    private String Message;
    private int NumberTotalRows;
    private int NumberLoadedRows;
    private int NumberFilteredRows;
    private int NumberUnselectedRows;
    private int LoadBytes;
    private int LoadTimeMs;
    private int BeginTxnTimeMs;
    private int StreamLoadPutTimeMs;
    private int ReadDataTimeMs;
    private int WriteDataTimeMs;
    private int ReceiveDataTimeMs;
    private int CommitAndPublishTimeMs;
}

7. Table Name Utility

package com.fantaibao.util;

import com.baomidou.mybatisplus.annotation.TableName;
import java.util.concurrent.ConcurrentHashMap;
import java.util.*;

public class TableUtils {
    public static <T> String getTableName(Class<T> entityClass) {
        TableName tableNameAnnotation = entityClass.getAnnotation(TableName.class);
        if (tableNameAnnotation != null) {
            return tableNameAnnotation.value();
        }
        return entityClass.getSimpleName()
                .replaceAll("([A-Z])", "_$1")
                .toLowerCase()
                .substring(1);
    }
}

8. Entity Example (User)

package com.example.doris.entity;

import com.baomidou.mybatisplus.annotation.TableName;
import com.fantaibao.constants.DorisField;
import lombok.*;
import java.time.LocalDateTime;

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@TableName("user ")
public class User {
    @DorisField(order = 1)
    private Long id;

    @DorisField(value = "user_name", order = 2)
    private String name;

    @DorisField(order = 3)
    private Integer age;

    @DorisField(value = "account_status", order = 4)
    private Integer status;

    @DorisField(ignore = true)
    private String password; // ignored

    private String email; // no annotation, not mapped

    @DorisField(order = 5)
    private LocalDateTime createTime;
}

9. Service Layer Implementation

package com.example.doris.service;

import com.example.doris.core.DorisStreamLoader;
import com.example.doris.entity.User;
import com.example.doris.util.DorisMapper;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.*;

@Service
@RequiredArgsConstructor
public class UserService {
    private final DorisStreamLoader dorisStreamLoader;

    /** Batch update user status */
    public void batchUpdateUserStatus(List<User> users) {
        Map<String, Object> additionalFields = new HashMap<>();
        additionalFields.put("update_time", LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME));
        List<Map<String, Object>> dorisData = DorisMapper.convertToDorisData(users, additionalFields);
        dorisStreamLoader.parallelStreamLoad(dorisData, "json",
                UserProvider.getUser().getTenantDbConnectionString(),
                TableUtils.getTableName(DishAppManagementMapping.class));
    }

    /** Batch create users */
    public void batchCreateUsers(List<User> users) {
        Map<String, Object> additionalFields = new HashMap<>();
        additionalFields.put("create_time", LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME));
        List<Map<String, Object>> dorisData = DorisMapper.convertToDorisData(users, additionalFields);
        dorisStreamLoader.parallelStreamLoad(dorisData, "json",
                UserProvider.getUser().getTenantDbConnectionString(),
                TableUtils.getTableName(DishAppManagementMapping.class));
    }
}

10. Configuration Bean

package com.example.doris.config;

import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableConfigurationProperties(DorisStreamLoadProperties.class)
public class DorisConfig {
    @Bean
    public DorisStreamLoader dorisStreamLoader(@Qualifier("dorisStreamLoadProperties") DorisStreamLoadProperties properties) {
        return new DorisStreamLoader(properties);
    }
}

11. Application YAML

doris:
  stream-load:
    url: http://doris-fe:8030/api/user_db/user_table/_stream_load
    username: admin
    password: "secure_password"
    connect-timeout: 60000   # 60 seconds
    socket-timeout: 300000   # 5 minutes
    batch-size: 100000       # 100k rows per batch
    max-retries: 5           # up to 5 retries
    compression: gzip        # enable GZIP
    max-parallel: 8          # 8 parallel threads

spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8

Solution Highlights and Advantages

Automatic field mapping via @DorisField supports renaming, ordering and ignoring fields.

High‑performance processing : batch size of 100k, 8‑thread parallelism, optional GZIP compression, exponential back‑off retry.

Intelligent cache for reflection results to avoid repeated scanning.

Result monitoring logs success count, load time and detailed response.

Performance Optimization Suggestions

Adjust batch size according to Doris cluster capacity, e.g., batch-size: 50000.

Increase parallelism, typically max-parallel: ${CPU_CORES*2}.

Enable compression ( compression: gzip) to reduce network traffic.

Increase timeouts for large data volumes, e.g., connect-timeout: 120000, socket-timeout: 600000.

Cluster‑side tuning: increase streaming_load_max_mb and max_running_txn_num_per_db as needed.

Performance chart
Performance chart
JavaPerformance optimizationData IngestiondorisAnnotation MappingStream Load
Code Ape Tech Column
Written by

Code Ape Tech Column

Former Ant Group P8 engineer, pure technologist, sharing full‑stack Java, job interview and career advice through a column. Site: java-family.cn

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.