How to Build a High‑Performance Doris Stream Load Service in Java
This guide walks through the complete architecture, Maven dependencies, configuration classes, annotation‑driven field mapping, utility mappers, a parallel Stream Load core, response handling, and performance tuning for integrating Apache Doris with a Spring Boot backend.
Overall Architecture
├── src
│ ├── main
│ │ ├── java
│ │ │ └── com
│ │ │ └── example
│ │ │ └── doris
│ │ │ ├── annotation
│ │ │ │ └── DorisField.java
│ │ │ ├── config
│ │ │ │ ├── DorisConfig.java
│ │ │ │ └── DorisStreamLoadProperties.java
│ │ │ ├── core
│ │ │ │ └── DorisStreamLoader.java
│ │ │ ├── entity
│ │ │ │ └── User.java
│ │ │ ├── service
│ │ │ │ └── UserService.java
│ │ │ └── util
│ │ │ └── DorisMapper.java
│ │ └── resources
│ │ └── application.yml1. Dependency Configuration (pom.xml)
<dependencies>
<!-- Spring Boot Starter -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- HTTP client -->
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.13</version>
</dependency>
<!-- Jackson -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
</dependency>
<!-- Lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!-- Guava -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>31.1-jre</version>
</dependency>
<!-- Reflections (runtime class scanning) -->
<dependency>
<groupId>org.reflections</groupId>
<artifactId>reflections</artifactId>
<version>0.10.2</version>
</dependency>
</dependencies>2. Configuration Properties Class
DorisStreamLoadProperties.java
package com.example.doris.config;
import lombok.Getter;
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
@Getter
@Setter
@Configuration
@ConfigurationProperties(prefix = "doris.stream-load")
public class DorisStreamLoadProperties {
private String url;
private String username;
private String password;
private int connectTimeout = 30000;
private int socketTimeout = 60000;
private int batchSize = 50000;
private int maxRetries = 3;
private String compression = "none";
private int maxParallel = 4;
private String columnSeparator = "\\x01";
private String lineSeparator = "\
";
}3. Field Mapping Annotation
DorisField.java
package com.example.doris.annotation;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.FIELD)
public @interface DorisField {
/** Doris table column name (defaults to field name) */
String value() default "";
/** Field order – smaller numbers appear first */
int order() default Integer.MAX_VALUE;
/** Whether to ignore this field */
boolean ignore() default false;
}4. Automatic Mapping Utility
DorisMapper.java
package com.example.doris.util;
import cn.hutool.core.date.DateUtil;
import com.example.doris.annotation.DorisField;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.lang.reflect.Field;
import java.util.*;
import java.util.stream.Collectors;
public class DorisMapper {
private static final ObjectMapper objectMapper = new ObjectMapper();
/** Convert entity list to a list of ordered maps for Doris */
public static <T> List<Map<String, Object>> convertToDorisData(List<T> entities, Map<String, Object> additionalFields) {
if (entities == null || entities.isEmpty()) {
return Collections.emptyList();
}
Class<?> clazz = entities.get(0).getClass();
List<Field> orderedFields = getOrderedFields(clazz);
return entities.stream().map(entity -> {
Map<String, Object> map = new LinkedHashMap<>();
if (additionalFields != null) {
map.putAll(additionalFields);
}
for (Field field : orderedFields) {
try {
field.setAccessible(true);
DorisField annotation = field.getAnnotation(DorisField.class);
String fieldName = annotation.value().isEmpty() ? field.getName() : annotation.value();
Object value = field.get(entity);
// Format Date fields as "yyyy-MM-dd HH:mm:ss"
map.put(fieldName, field.getType() == Date.class ? DateUtil.format((Date) value, "yyyy-MM-dd HH:mm:ss") : value);
} catch (IllegalAccessException e) {
throw new RuntimeException("Field access failed: " + field.getName(), e);
}
}
return map;
}).collect(Collectors.toList());
}
/** Convert entity list to Doris data using Jackson serialization */
public static <T> List<Map<String, Object>> convertWithJackson(List<T> entities, Map<String, Object> additionalFields) {
return entities.stream().map(entity -> {
Map<String, Object> map = objectMapper.convertValue(entity, new TypeReference<LinkedHashMap<String, Object>>() {});
if (additionalFields != null) {
map.putAll(additionalFields);
}
return map;
}).collect(Collectors.toList());
}
/** Get fields annotated with @DorisField, sorted by order and not ignored */
private static List<Field> getOrderedFields(Class<?> clazz) {
return Arrays.stream(clazz.getDeclaredFields())
.filter(f -> f.isAnnotationPresent(DorisField.class))
.filter(f -> !f.getAnnotation(DorisField.class).ignore())
.sorted(Comparator.comparingInt(f -> f.getAnnotation(DorisField.class).order()))
.collect(Collectors.toList());
}
}5. Doris Stream Load Core Class
DorisStreamLoader.java
package com.example.doris.core;
import com.alibaba.fastjson.JSONObject;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.Header;
import org.apache.http.HttpEntity;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.entity.HttpEntityWrapper;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.message.BasicHeader;
import org.apache.http.util.EntityUtils;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
import java.util.stream.Collectors;
import java.util.zip.GZIPOutputStream;
@Slf4j
@Component
public class DorisStreamLoader {
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
private final DorisStreamLoadProperties properties;
private final String authEncoding;
private final CloseableHttpClient httpClient;
private final ExecutorService executorService;
public DorisStreamLoader(@Qualifier("dorisStreamLoadProperties") DorisStreamLoadProperties properties) {
this.properties = properties;
this.authEncoding = Base64.getEncoder().encodeToString((properties.getUsername() + ":" + properties.getPassword()).getBytes(StandardCharsets.UTF_8));
this.httpClient = HttpClients.custom()
.setConnectionTimeToLive(60, TimeUnit.SECONDS)
.setMaxConnTotal(properties.getMaxParallel() * 2)
.setMaxConnPerRoute(properties.getMaxParallel())
.build();
this.executorService = Executors.newFixedThreadPool(properties.getMaxParallel());
}
/** Parallel stream load */
public void parallelStreamLoad(List<Map<String, Object>> data, String format, String dbName, String tableName) {
if (data.isEmpty()) return;
List<List<Map<String, Object>>> batches = Lists.partition(data, properties.getBatchSize());
CompletableFuture.allOf(batches.stream()
.map(batch -> CompletableFuture.runAsync(() -> {
try {
sendBatch(batch, format, dbName, tableName);
} catch (Exception e) {
log.error("Stream Load batch failed", e);
throw new RuntimeException(e);
}
}, executorService)).toArray(CompletableFuture[]::new)).join();
}
private void sendBatch(List<Map<String, Object>> batch, String format, String dbName, String tableName) {
String payload = "csv".equalsIgnoreCase(format) ? convertToCsv(batch) : convertToJson(batch);
attemptSendWithRetry(payload, format, 0, dbName, tableName);
}
private void attemptSendWithRetry(String data, String format, int retryCount, String dbName, String tableName) {
try {
sendToDoris(data, format, dbName, tableName);
log.info("Stream Load succeeded");
} catch (Exception e) {
if (retryCount < properties.getMaxRetries()) {
long delay = 1000L * (long) Math.pow(2, retryCount);
log.warn("Stream Load failed, retry {} after {} ms", retryCount + 1, delay, e);
scheduler.schedule(() -> attemptSendWithRetry(data, format, retryCount + 1, dbName, tableName), delay, TimeUnit.MILLISECONDS);
} else {
log.error("Stream Load reached max retries", e);
throw new RuntimeException("Stream Load batch failed: " + e.getMessage(), e);
}
}
}
private String convertToJson(List<Map<String, Object>> batch) {
try {
return new ObjectMapper().writeValueAsString(batch);
} catch (JsonProcessingException e) {
throw new RuntimeException("JSON conversion failed", e);
}
}
private String convertToCsv(List<Map<String, Object>> batch) {
return batch.stream()
.map(row -> row.values().stream()
.map(v -> v == null ? "\\N" : v.toString())
.collect(Collectors.joining(properties.getColumnSeparator())))
.collect(Collectors.joining(properties.getLineSeparator()));
}
private void sendToDoris(String data, String format, String dbName, String tableName) throws IOException {
HttpPut httpPut = new HttpPut(String.format(properties.getUrl(), dbName, tableName));
httpPut.setHeader("Authorization", "Basic " + authEncoding);
RequestConfig config = RequestConfig.custom()
.setConnectTimeout(properties.getConnectTimeout())
.setSocketTimeout(properties.getSocketTimeout())
.build();
httpPut.setConfig(config);
if ("csv".equalsIgnoreCase(format)) {
httpPut.setHeader("Content-Type", "text/plain");
httpPut.setHeader("format", "csv");
httpPut.setHeader("column_separator", properties.getColumnSeparator());
httpPut.setHeader("line_delimiter", properties.getLineSeparator());
} else {
httpPut.setHeader("Content-Type", "application/json");
httpPut.setHeader("format", "json");
httpPut.setHeader("strip_outer_array", "true");
}
HttpEntity entity;
if ("gzip".equals(properties.getCompression())) {
httpPut.setHeader("compress_type", "gz");
entity = new GzipCompressingEntity(data);
} else {
entity = new StringEntity(data, StandardCharsets.UTF_8);
}
httpPut.setEntity(entity);
try (CloseableHttpResponse response = httpClient.execute(httpPut)) {
int statusCode = response.getStatusLine().getStatusCode();
LoadResponse result = JSONObject.parseObject(EntityUtils.toString(response.getEntity()), LoadResponse.class);
if (!"Success".equals(result.getStatus())) {
log.error("Stream Load failed: HTTP {} - {}", statusCode, result);
throw new IOException("Doris Stream Load failed: " + result);
}
log.info("Imported {} rows, time {} ms", result.getNumberLoadedRows(), result.getLoadTimeMs());
}
}
/** GZIP compressing entity */
static class GzipCompressingEntity extends HttpEntityWrapper {
public GzipCompressingEntity(String data) throws IOException {
super(createEntity(data));
}
private static HttpEntity createEntity(String data) throws IOException {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
try (GZIPOutputStream gzip = new GZIPOutputStream(bos)) {
gzip.write(data.getBytes(StandardCharsets.UTF_8));
}
return new ByteArrayEntity(bos.toByteArray());
}
@Override
public Header getContentEncoding() {
return new BasicHeader("Content-Encoding", "gzip");
}
}
}6. Response Class
LoadResponse.java
package com.example.doris.application.config;
import lombok.Data;
@Data
public class LoadResponse {
private long TxnId;
private String Label;
private String Comment;
private boolean TwoPhaseCommit;
private String Status;
private String Message;
private int NumberTotalRows;
private int NumberLoadedRows;
private int NumberFilteredRows;
private int NumberUnselectedRows;
private int LoadBytes;
private int LoadTimeMs;
private int BeginTxnTimeMs;
private int StreamLoadPutTimeMs;
private int ReadDataTimeMs;
private int WriteDataTimeMs;
private int ReceiveDataTimeMs;
private int CommitAndPublishTimeMs;
}7. Table Name Utility
TableUtils.java
package com.example.doris.util;
import com.baomidou.mybatisplus.annotation.TableName;
import java.util.concurrent.ConcurrentHashMap;
import java.util.Map;
import java.util.List;
import java.util.ArrayList;
public class TableUtils {
public static <T> String getTableName(Class<T> entityClass) {
TableName annotation = entityClass.getAnnotation(TableName.class);
if (annotation != null) {
return annotation.value();
}
// Default: camelCase to snake_case
return entityClass.getSimpleName()
.replaceAll("([A-Z])", "_$1")
.toLowerCase()
.substring(1);
}
}8. Entity Example
User.java
package com.example.doris.entity;
import com.baomidou.mybatisplus.annotation.TableName;
import com.example.doris.annotation.DorisField;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.time.LocalDateTime;
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@TableName("user")
public class User {
@DorisField(order = 1)
private Long id;
@DorisField(value = "user_name", order = 2)
private String name;
@DorisField(order = 3)
private Integer age;
@DorisField(value = "account_status", order = 4)
private Integer status;
@DorisField(ignore = true)
private String password; // ignored
private String email; // no annotation, ignored
@DorisField(order = 5)
private LocalDateTime createTime;
}9. Service Layer Implementation
UserService.java
package com.example.doris.service;
import com.example.doris.core.DorisStreamLoader;
import com.example.doris.entity.User;
import com.example.doris.util.DorisMapper;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Service
@RequiredArgsConstructor
public class UserService {
private final DorisStreamLoader dorisStreamLoader;
/** Batch update user status */
public void batchUpdateUserStatus(List<User> users) {
Map<String, Object> additionalFields = new HashMap<>();
additionalFields.put("update_time", LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME));
List<Map<String, Object>> dorisData = DorisMapper.convertToDorisData(users, additionalFields);
dorisStreamLoader.parallelStreamLoad(dorisData, "json", UserProvider.getUser().getTenantDbConnectionString(), TableUtils.getTableName(User.class));
}
/** Batch create users */
public void batchCreateUsers(List<User> users) {
Map<String, Object> additionalFields = new HashMap<>();
additionalFields.put("create_time", LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME));
List<Map<String, Object>> dorisData = DorisMapper.convertToDorisData(users, additionalFields);
dorisStreamLoader.parallelStreamLoad(dorisData, "json", UserProvider.getUser().getTenantDbConnectionString(), TableUtils.getTableName(User.class));
}
}10. Configuration Class
DorisConfig.java
package com.example.doris.config;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@EnableConfigurationProperties(DorisStreamLoadProperties.class)
public class DorisConfig {
@Bean
public DorisStreamLoader dorisStreamLoader(@Qualifier("dorisStreamLoadProperties") DorisStreamLoadProperties properties) {
return new DorisStreamLoader(properties);
}
}11. Application YAML
doris:
stream-load:
url: http://doris-fe:8030/api/user_db/user_table/_stream_load
username: admin
password: "secure_password"
connect-timeout: 60000 # 60 seconds
socket-timeout: 300000 # 5 minutes
batch-size: 100000 # 100k rows per batch
max-retries: 5
compression: gzip
max-parallel: 8 # 8 threads
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8Solution Features & Advantages
Automatic field mapping via @DorisField supports renaming, ordering, and ignoring fields.
High‑performance processing : batch size 100k, 8‑thread parallelism, optional GZIP compression, exponential back‑off retries.
Smart cache for reflected fields to avoid repeated reflection.
Result monitoring with detailed LoadResponse logging.
Performance Optimization Suggestions
Adjust batch size according to cluster capacity, e.g., batch-size: 50000.
Increase parallelism, e.g., max-parallel: ${CPU_CORES*2}.
Enable GZIP compression to reduce network traffic.
Increase timeouts for large data loads: connect-timeout: 120000, socket-timeout: 600000.
Increase Doris BE memory and parallel task limits via SQL:
SET GLOBAL streaming_load_max_mb = 4096;
SET GLOBAL max_running_txn_num_per_db = 1024;Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Selected Java Interview Questions
A professional Java tech channel sharing common knowledge to help developers fill gaps. Follow us!
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
