Boost Your Data Ingestion: A High‑Performance Java Stream Load Architecture for Doris
This article presents a complete Java‑based architecture for high‑throughput Doris stream loading, covering project structure, Maven dependencies, configuration properties, field‑mapping annotations, automatic mapper utilities, a robust parallel loader with retry and compression, plus performance tuning recommendations.
Overall Architecture
├── src
│ ├── main
│ │ ├── java
│ │ │ └── com
│ │ │ └── example
│ │ │ └── doris
│ │ │ ├── annotation
│ │ │ │ └── DorisField.java
│ │ │ ├── config
│ │ │ │ ├── DorisConfig.java
│ │ │ │ └── DorisStreamLoadProperties.java
│ │ │ ├── core
│ │ │ │ └── DorisStreamLoader.java
│ │ │ ├── entity
│ │ │ │ └── User.java
│ │ │ ├── service
│ │ │ │ └── UserService.java
│ │ │ └── util
│ │ │ └── DorisMapper.java
│ │ └── resources
│ │ └── application.yml1. Dependency Configuration (pom.xml)
<dependencies>
<!-- Spring Boot Starter -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- HTTP client -->
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.13</version>
</dependency>
<!-- Jackson -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
</dependency>
<!-- Lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!-- Guava -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>31.1-jre</version>
</dependency>
<!-- Reflections (runtime annotation scanning) -->
<dependency>
<groupId>org.reflections</groupId>
<artifactId>reflections</artifactId>
<version>0.10.2</version>
</dependency>
</dependencies>2. Configuration Properties Class
package com.example.doris.config;
import lombok.Getter;
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
@Getter
@Setter
@Configuration
@ConfigurationProperties(prefix = "doris.stream-load")
public class DorisStreamLoadProperties {
private String url;
private String username;
private String password;
private int connectTimeout = 30000;
private int socketTimeout = 60000;
private int batchSize = 50000;
private int maxRetries = 3;
private String compression = "none";
private int maxParallel = 4;
private String columnSeparator = "\\x01";
private String lineSeparator = "\
";
}3. Field‑Mapping Annotation
package com.example.doris.annotation;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.FIELD)
public @interface DorisField {
/** Doris table column name (defaults to field name) */
String value() default "";
/** Order of the column – smaller values appear first */
int order() default Integer.MAX_VALUE;
/** Whether to ignore this field */
boolean ignore() default false;
}4. Automatic Mapping Utility
package com.example.doris.util;
import cn.hutool.core.date.DateUtil;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.lang.reflect.Field;
import java.util.*;
import java.util.stream.Collectors;
public class DorisMapper {
private static final ObjectMapper objectMapper = new ObjectMapper();
/** Convert a list of entities to a list of ordered maps for Doris */
public static <T> List<Map<String, Object>> convertToDorisData(List<T> entities, Map<String, Object> additionalFields) {
if (entities == null || entities.isEmpty()) {
return Collections.emptyList();
}
Class<?> clazz = entities.get(0).getClass();
List<Field> orderedFields = getOrderedFields(clazz);
return entities.stream().map(entity -> {
Map<String, Object> map = new LinkedHashMap<>();
if (additionalFields != null) {
map.putAll(additionalFields);
}
for (Field field : orderedFields) {
try {
field.setAccessible(true);
DorisField annotation = field.getAnnotation(DorisField.class);
String fieldName = annotation.value().isEmpty() ? field.getName() : annotation.value();
Object value = field.get(entity);
map.put(fieldName, field.getType() == Date.class ? DateUtil.format((Date) value, "yyyy-MM-dd HH:mm:ss") : value);
} catch (IllegalAccessException e) {
throw new RuntimeException("Field access failed: " + field.getName(), e);
}
}
return map;
}).collect(Collectors.toList());
}
/** Same conversion using Jackson serialization */
public static <T> List<Map<String, Object>> convertWithJackson(List<T> entities, Map<String, Object> additionalFields) {
return entities.stream().map(entity -> {
Map<String, Object> map = objectMapper.convertValue(entity, new TypeReference<LinkedHashMap<String, Object>>() {});
if (additionalFields != null) {
map.putAll(additionalFields);
}
return map;
}).collect(Collectors.toList());
}
private static List<Field> getOrderedFields(Class<?> clazz) {
return Arrays.stream(clazz.getDeclaredFields())
.filter(f -> f.isAnnotationPresent(DorisField.class))
.filter(f -> !f.getAnnotation(DorisField.class).ignore())
.sorted(Comparator.comparingInt(f -> f.getAnnotation(DorisField.class).order()))
.collect(Collectors.toList());
}
}5. Doris Stream Load Core Class
package com.example.doris.core;
import com.alibaba.fastjson.JSONObject;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.Header;
import org.apache.http.HttpEntity;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.entity.HttpEntityWrapper;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.message.BasicHeader;
import org.apache.http.util.EntityUtils;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.Collectors;
import java.util.zip.GZIPOutputStream;
@Slf4j
@Component
public class DorisStreamLoader {
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
private final DorisStreamLoadProperties properties;
private final String authEncoding;
private final CloseableHttpClient httpClient;
private final ExecutorService executorService;
public DorisStreamLoader(@Qualifier("dorisStreamLoadProperties") DorisStreamLoadProperties properties) {
this.properties = properties;
this.authEncoding = Base64.getEncoder().encodeToString((properties.getUsername() + ":" + properties.getPassword()).getBytes(StandardCharsets.UTF_8));
this.httpClient = HttpClients.custom()
.setConnectionTimeToLive(60, TimeUnit.SECONDS)
.setMaxConnTotal(properties.getMaxParallel() * 2)
.setMaxConnPerRoute(properties.getMaxParallel())
.build();
this.executorService = Executors.newFixedThreadPool(properties.getMaxParallel());
}
/** Parallel stream load */
public void parallelStreamLoad(List<Map<String, Object>> data, String format, String dbName, String tableName) {
if (data.isEmpty()) return;
List<List<Map<String, Object>>> batches = Lists.partition(data, properties.getBatchSize());
CompletableFuture.allOf(batches.stream()
.map(batch -> CompletableFuture.runAsync(() -> {
try {
sendBatch(batch, format, dbName, tableName);
} catch (Exception e) {
log.error("Stream Load batch failed", e);
throw new RuntimeException(e);
}
}, executorService)).toArray(CompletableFuture[]::new)).join();
}
private void sendBatch(List<Map<String, Object>> batch, String format, String dbName, String tableName) {
String data = "csv".equalsIgnoreCase(format) ? convertToCsv(batch) : convertToJson(batch);
attemptSendWithRetry(data, format, 0, dbName, tableName);
}
private void attemptSendWithRetry(String data, String format, int retryCount, String dbName, String tableName) {
try {
sendToDoris(data, format, dbName, tableName);
log.info("Stream Load succeeded");
} catch (Exception e) {
if (retryCount < properties.getMaxRetries()) {
long delay = 1000L * (long) Math.pow(2, retryCount);
log.warn("Stream Load failed, retry {} after {}ms", retryCount + 1, delay, e);
scheduler.schedule(() -> attemptSendWithRetry(data, format, retryCount + 1, dbName, tableName), delay, TimeUnit.MILLISECONDS);
} else {
log.error("Stream Load reached max retries", e);
throw new RuntimeException("Stream Load batch failed: " + e.getMessage(), e);
}
}
}
private String convertToJson(List<Map<String, Object>> batch) {
try {
return new ObjectMapper().writeValueAsString(batch);
} catch (JsonProcessingException e) {
throw new RuntimeException("JSON conversion failed", e);
}
}
private String convertToCsv(List<Map<String, Object>> batch) {
return batch.stream()
.map(row -> row.values().stream()
.map(v -> v == null ? "\\N" : v.toString())
.collect(Collectors.joining(properties.getColumnSeparator())))
.collect(Collectors.joining(properties.getLineSeparator()));
}
private void sendToDoris(String data, String format, String dbName, String tableName) throws IOException {
HttpPut httpPut = new HttpPut(String.format(properties.getUrl(), dbName, tableName));
httpPut.setHeader("Authorization", "Basic " + authEncoding);
RequestConfig config = RequestConfig.custom()
.setConnectTimeout(properties.getConnectTimeout())
.setSocketTimeout(properties.getSocketTimeout())
.build();
httpPut.setConfig(config);
if ("csv".equalsIgnoreCase(format)) {
httpPut.setHeader("Content-Type", "text/plain");
httpPut.setHeader("format", "csv");
httpPut.setHeader("column_separator", properties.getColumnSeparator());
httpPut.setHeader("line_delimiter", properties.getLineSeparator());
} else {
httpPut.setHeader("Content-Type", "application/json");
httpPut.setHeader("format", "json");
httpPut.setHeader("strip_outer_array", "true");
}
HttpEntity entity;
if ("gzip".equals(properties.getCompression())) {
httpPut.setHeader("compress_type", "gz");
entity = new GzipCompressingEntity(data);
} else {
entity = new StringEntity(data, StandardCharsets.UTF_8);
}
httpPut.setEntity(entity);
try (CloseableHttpResponse response = httpClient.execute(httpPut)) {
int statusCode = response.getStatusLine().getStatusCode();
LoadResponse result = JSONObject.parseObject(EntityUtils.toString(response.getEntity()), LoadResponse.class);
if (!"Success".equals(result.getStatus())) {
log.error("Stream Load failed: HTTP {} - {}", statusCode, result);
throw new IOException("Doris Stream Load failed: " + result);
}
log.info("Loaded {} rows in {} ms", result.getNumberLoadedRows(), result.getLoadTimeMs());
}
}
/** GZIP compressing entity */
static class GzipCompressingEntity extends HttpEntityWrapper {
public GzipCompressingEntity(String data) throws IOException {
super(createEntity(data));
}
private static HttpEntity createEntity(String data) throws IOException {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
try (GZIPOutputStream gzip = new GZIPOutputStream(bos)) {
gzip.write(data.getBytes(StandardCharsets.UTF_8));
}
return new ByteArrayEntity(bos.toByteArray());
}
@Override
public Header getContentEncoding() {
return new BasicHeader("Content-Encoding", "gzip");
}
}
}6. API Response Class (LoadResponse)
package com.fantaibao.application.config;
import lombok.Data;
@Data
public class LoadResponse {
private long TxnId;
private String Label;
private String Comment;
private boolean TwoPhaseCommit;
private String Status;
private String Message;
private int NumberTotalRows;
private int NumberLoadedRows;
private int NumberFilteredRows;
private int NumberUnselectedRows;
private int LoadBytes;
private int LoadTimeMs;
private int BeginTxnTimeMs;
private int StreamLoadPutTimeMs;
private int ReadDataTimeMs;
private int WriteDataTimeMs;
private int ReceiveDataTimeMs;
private int CommitAndPublishTimeMs;
}7. Table Name Utility
package com.fantaibao.util;
import com.baomidou.mybatisplus.annotation.TableName;
import java.util.concurrent.ConcurrentHashMap;
import java.util.*;
public class TableUtils {
public static <T> String getTableName(Class<T> entityClass) {
TableName tableNameAnnotation = entityClass.getAnnotation(TableName.class);
if (tableNameAnnotation != null) {
return tableNameAnnotation.value();
}
return entityClass.getSimpleName()
.replaceAll("([A-Z])", "_$1")
.toLowerCase()
.substring(1);
}
}8. Entity Example (User)
package com.example.doris.entity;
import com.baomidou.mybatisplus.annotation.TableName;
import com.fantaibao.constants.DorisField;
import lombok.*;
import java.time.LocalDateTime;
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@TableName("user ")
public class User {
@DorisField(order = 1)
private Long id;
@DorisField(value = "user_name", order = 2)
private String name;
@DorisField(order = 3)
private Integer age;
@DorisField(value = "account_status", order = 4)
private Integer status;
@DorisField(ignore = true)
private String password; // ignored
private String email; // no annotation, not mapped
@DorisField(order = 5)
private LocalDateTime createTime;
}9. Service Layer Implementation
package com.example.doris.service;
import com.example.doris.core.DorisStreamLoader;
import com.example.doris.entity.User;
import com.example.doris.util.DorisMapper;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.*;
@Service
@RequiredArgsConstructor
public class UserService {
private final DorisStreamLoader dorisStreamLoader;
/** Batch update user status */
public void batchUpdateUserStatus(List<User> users) {
Map<String, Object> additionalFields = new HashMap<>();
additionalFields.put("update_time", LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME));
List<Map<String, Object>> dorisData = DorisMapper.convertToDorisData(users, additionalFields);
dorisStreamLoader.parallelStreamLoad(dorisData, "json",
UserProvider.getUser().getTenantDbConnectionString(),
TableUtils.getTableName(DishAppManagementMapping.class));
}
/** Batch create users */
public void batchCreateUsers(List<User> users) {
Map<String, Object> additionalFields = new HashMap<>();
additionalFields.put("create_time", LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME));
List<Map<String, Object>> dorisData = DorisMapper.convertToDorisData(users, additionalFields);
dorisStreamLoader.parallelStreamLoad(dorisData, "json",
UserProvider.getUser().getTenantDbConnectionString(),
TableUtils.getTableName(DishAppManagementMapping.class));
}
}10. Configuration Bean
package com.example.doris.config;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@EnableConfigurationProperties(DorisStreamLoadProperties.class)
public class DorisConfig {
@Bean
public DorisStreamLoader dorisStreamLoader(@Qualifier("dorisStreamLoadProperties") DorisStreamLoadProperties properties) {
return new DorisStreamLoader(properties);
}
}11. Application YAML
doris:
stream-load:
url: http://doris-fe:8030/api/user_db/user_table/_stream_load
username: admin
password: "secure_password"
connect-timeout: 60000 # 60 seconds
socket-timeout: 300000 # 5 minutes
batch-size: 100000 # 100k rows per batch
max-retries: 5 # up to 5 retries
compression: gzip # enable GZIP
max-parallel: 8 # 8 parallel threads
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8Solution Highlights and Advantages
Automatic field mapping via @DorisField supports renaming, ordering and ignoring fields.
High‑performance processing : batch size of 100k, 8‑thread parallelism, optional GZIP compression, exponential back‑off retry.
Intelligent cache for reflection results to avoid repeated scanning.
Result monitoring logs success count, load time and detailed response.
Performance Optimization Suggestions
Adjust batch size according to Doris cluster capacity, e.g., batch-size: 50000.
Increase parallelism, typically max-parallel: ${CPU_CORES*2}.
Enable compression ( compression: gzip) to reduce network traffic.
Increase timeouts for large data volumes, e.g., connect-timeout: 120000, socket-timeout: 600000.
Cluster‑side tuning: increase streaming_load_max_mb and max_running_txn_num_per_db as needed.
Code Ape Tech Column
Former Ant Group P8 engineer, pure technologist, sharing full‑stack Java, job interview and career advice through a column. Site: java-family.cn
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
