Master Spring Batch Partitioning in Spring Boot 3 to Process Millions of Records Efficiently

This article demonstrates how to use Spring Batch partitioning in Spring Boot 3, covering the architecture of manager and worker steps, custom partitioner implementation, job and step configuration, essential beans, and a complete runnable example that processes millions of records with parallel threads.

Spring Full-Stack Practical Cases
Spring Full-Stack Practical Cases
Spring Full-Stack Practical Cases
Master Spring Batch Partitioning in Spring Boot 3 to Process Millions of Records Efficiently

Spring Boot 3 practical case collection has been updated to 152 examples, and this article focuses on a Spring Batch partitioning case for processing massive data efficiently.

1. Introduction

Spring Batch runs steps sequentially by default. To achieve parallel processing, we partition a step so that multiple worker nodes can process different data ranges concurrently.

2. Core Concepts

The job diagram shows a manager step coordinating several identical worker steps. Workers can be remote services or local threads; each receives a distinct data range via ExecutionContext. The JobRepository guarantees each worker runs only once per job execution.

3. Dependency

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-batch</artifactId>
</dependency>

4. Enable Batch Processing

@Configuration
@EnableBatchProcessing
public class AppConfig {}

5. Custom Partitioner

public class IdRangePartitioner implements Partitioner {

  private final JdbcClient jdbcClient;

  public IdRangePartitioner(JdbcClient jdbcClient) {
    this.jdbcClient = jdbcClient;
  }

  @Override
  public Map<String, ExecutionContext> partition(int gridSize) {
    int min = this.jdbcClient.sql("SELECT MIN(id) FROM o_user").query(Integer.class).single();
    int max = this.jdbcClient.sql("SELECT MAX(id) FROM o_user").query(Integer.class).single();
    int targetSize = (max - min) / gridSize + 1;

    Map<String, ExecutionContext> result = new HashMap<>();
    int number = 0;
    int start = min;
    int end = start + targetSize - 1;
    while (start <= max) {
      ExecutionContext value = new ExecutionContext();
      result.put("partition" + number, value);
      if (end >= max) {
        end = max;
      }
      value.putInt("minValue", start);
      value.putInt("maxValue", end);
      start += targetSize;
      end += targetSize;
      number++;
    }
    return result;
  }
}

6. Job Configuration

@Configuration
public class JobConfig {

  private final JobRepository jobRepository;
  private final PlatformTransactionManager transactionManager;
  private final DataSource dataSource;

  public JobConfig(JobRepository jobRepository, PlatformTransactionManager transactionManager, DataSource dataSource) {
    this.jobRepository = jobRepository;
    this.transactionManager = transactionManager;
    this.dataSource = dataSource;
  }

  @Bean
  IdRangePartitioner partitioner(JdbcClient jdbcClient) {
    return new IdRangePartitioner(jdbcClient);
  }

  @Bean
  @StepScope
  JdbcPagingItemReader<User> pagingItemReader(@Value("#{stepExecutionContext['minValue']}") Long minValue,
                                             @Value("#{stepExecutionContext['maxValue']}") Long maxValue) {
    System.out.println("reading " + minValue + " to " + maxValue);
    Map<String, Order> sortKeys = new HashMap<>();
    sortKeys.put("id", Order.ASCENDING);
    MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
    queryProvider.setSelectClause("id, name, age, phone, sex");
    queryProvider.setFromClause("from o_user");
    queryProvider.setWhereClause("where id >= " + minValue + " and id < " + maxValue);
    queryProvider.setSortKeys(sortKeys);
    JdbcPagingItemReader<User> reader = new JdbcPagingItemReader<>();
    reader.setDataSource(this.dataSource);
    reader.setFetchSize(1000);
    reader.setRowMapper(new UserRowMapper());
    reader.setQueryProvider(queryProvider);
    return reader;
  }

  @Bean
  @StepScope
  JdbcBatchItemWriter<User> userItemWriter() {
    JdbcBatchItemWriter<User> writer = new JdbcBatchItemWriter<>();
    writer.setDataSource(dataSource);
    writer.setSql("insert into n_user values (:id, :name, :age, :phone, :sex)");
    writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());
    return writer;
  }

  @Bean
  Step stepMaster(IdRangePartitioner partitioner) {
    return new StepBuilder("stepMaster", jobRepository)
        .partitioner(slaveStep().getName(), partitioner)
        .step(slaveStep())
        .gridSize(50)
        .taskExecutor(new SimpleAsyncTaskExecutor())
        .build();
  }

  @Bean
  Step slaveStep() {
    return new StepBuilder("slaveStep", jobRepository)
        .<User, User>chunk(1000, transactionManager)
        .reader(pagingItemReader(null, null))
        .writer(userItemWriter())
        .build();
  }

  @Bean
  Job job(@Qualifier("stepMaster") Step stepMaster) {
    return new JobBuilder("job", jobRepository)
        .start(stepMaster)
        .build();
  }
}

7. Supporting Classes

public class User {
  private Integer id;
  private String name;
  private Integer age;
  private String phone;
  private String sex;
  // getters and setters
}
public class UserRowMapper implements RowMapper<User> {
  @Override
  public User mapRow(ResultSet rs, int rowNum) throws SQLException {
    User user = new User();
    user.setId(rs.getInt("id"));
    user.setName(rs.getString("name"));
    user.setPhone(rs.getString("phone"));
    user.setSex(rs.getString("sex"));
    user.setAge(rs.getInt("age"));
    return user;
  }
}

8. Test Runner

@Component
public class TaskRunner implements CommandLineRunner {

  private final JobLauncher jobLauncher;
  private final Job job;

  public TaskRunner(JobLauncher jobLauncher, Job job) {
    this.jobLauncher = jobLauncher;
    this.job = job;
  }

  @Override
  public void run(String... args) throws Exception {
    JobParameters params = new JobParametersBuilder()
        .addString("JobId", String.valueOf(System.currentTimeMillis()))
        .toJobParameters();
    JobExecution execution = jobLauncher.run(job, params);
    System.err.println("STATUS :: " + execution.getStatus());
  }
}

9. Execution Result

When the application starts, the console first prints the partition information (see image). After processing, the final status and statistics are displayed (see image).

The article also mentions a permanent free update promise for subscribers.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

data-processingPartitioningSpring Batchspring-boot
Spring Full-Stack Practical Cases
Written by

Spring Full-Stack Practical Cases

Full-stack Java development with Vue 2/3 front-end suite; hands-on examples and source code analysis for Spring, Spring Boot 2/3, and Spring Cloud.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.