Boost Performance: Process Massive Datasets with Spring Batch and Virtual Threads

This article explains how Spring Batch’s core concepts, chunk processing, metadata management, virtual‑thread integration, partitioning, and multi‑threaded step configurations can dramatically increase throughput and resource efficiency when handling large, I/O‑intensive data sets.

Senior Xiao Ying
Senior Xiao Ying
Senior Xiao Ying
Boost Performance: Process Massive Datasets with Spring Batch and Virtual Threads

Spring Batch

Core architecture consists of five concepts:

Job : encapsulates the batch process, composed of one or more Steps. Each Job has independent execution records stored in a JobRepository, supporting parameterized runs and restart.

Step : independent phase of a Job containing processing logic; multiple Steps can be sequential or conditional.

ItemReader : abstraction for reading records one by one from a data source (database, file, queue).

ItemProcessor : applies business‑logic transformation, filtering, or validation. Returning null filters out items.

ItemWriter : writes processed results to a target system, typically using batch writes.

Chunk Processing Mode

Chunk processing groups data into fixed‑size blocks. Example bean defines a step processing 1,000 records per chunk with fault tolerance, retry, and skip logic.

@Bean
public Step dataProcessingStep() {
    return stepBuilderFactory.get("dataProcessingStep")
        .<InputData, OutputData>chunk(1000) // each chunk contains 1000 items
        .reader(itemReader())
        .processor(itemProcessor())
        .writer(itemWriter())
        .faultTolerant()
        .retryLimit(3)
        .retry(Exception.class)
        .skipLimit(100)
        .skip(DataValidationException.class)
        .build();
}

The read‑process‑write sequence is executed within a transaction that commits only after the entire chunk succeeds, ensuring reliability and consistency.

Metadata Management and Monitoring

Spring Batch stores execution metadata in a JobRepository, including:

JobInstance

JobExecution

StepExecution

This metadata enables job restart, status tracking, and fault recovery.

Virtual Thread Fundamentals

Virtual threads (Java 19+) are lightweight threads with:

Very low resource consumption : millions can be created without exhausting system resources.

JVM‑managed scheduling : not directly bound to OS threads.

Automatic load optimization : JVM maps virtual threads to platform threads as needed.

Spring Boot Virtual Thread Configuration

@Configuration
@ConditionalOnProperty(prefix = "spring", name = "virtual-thread", havingValue = "true")
public class VirtualThreadConfig {
    @Bean
    public AsyncTaskExecutor applicationTaskExecutor() {
        return new TaskExecutorAdapter(Executors.newVirtualThreadPerTaskExecutor());
    }

    @Bean
    public TomcatProtocolHandlerCustomizer<?> protocolHandlerCustomizer() {
        return protocolHandler -> {
            protocolHandler.setExecutor(Executors.newVirtualThreadPerTaskExecutor());
        };
    }
}

Enable with:

spring.virtual-thread=true
spring.threads.virtual.enabled=true

Partitioning

For very large data sets, partitioning divides data into subsets processed in parallel.

@Bean
public Step masterStep() {
    return stepBuilderFactory.get("masterStep")
        .partitioner("slaveStep", partitioner())
        .step(slaveStep())
        .gridSize(10) // number of partitions
        .taskExecutor(Executors.newVirtualThreadPerTaskExecutor())
        .build();
}

@Bean
public Partitioner partitioner() {
    return new CustomColumnRangePartitioner(jdbcTemplate, "data_table", "id");
}

@Bean
public Step slaveStep() {
    return stepBuilderFactory.get("slaveStep")
        .<InputData, OutputData>chunk(1000)
        .reader(partitionAwareReader())
        .processor(processor())
        .writer(writer())
        .build();
}

Multi‑Threaded Step Configuration

@Bean
public Step multiThreadedStep() {
    return stepBuilderFactory.get("multiThreadedStep")
        .<InputData, OutputData>chunk(1000)
        .reader(threadSafeReader())
        .processor(processor())
        .writer(threadSafeWriter())
        .taskExecutor(Executors.newVirtualThreadPerTaskExecutor())
        .throttleLimit(20) // limit concurrent virtual threads
        .build();
}

Lazy Loading and Streaming

@Bean
@StepScope
public JdbcCursorItemReader<LargeData> reader(@Value("#{stepExecutionContext['partitionId']}") String partitionId) {
    return new JdbcCursorItemReaderBuilder<LargeData>()
        .dataSource(dataSource)
        .sql("SELECT * FROM large_table WHERE partition_id = ?")
        .queryArguments(partitionId)
        .rowMapper(new BeanPropertyRowMapper<>(LargeData.class))
        .fetchSize(1000) // rows per round‑trip
        .saveState(false) // disable state saving for large files
        .build();
}

Large‑Scale Data Filtering Job Example

@Configuration
@EnableBatchProcessing
@EnableAsync
public class BigDataBatchConfig {
    @Autowired private JobBuilderFactory jobBuilderFactory;
    @Autowired private StepBuilderFactory stepBuilderFactory;

    @Bean
    public TaskExecutor virtualThreadTaskExecutor() {
        return new TaskExecutorAdapter(Executors.newVirtualThreadPerTaskExecutor());
    }

    @Bean
    @StepScope
    public FlatFileItemReader<Person> reader(@Value("#{jobParameters['input.file']}") String inputFile) {
        return new FlatFileItemReaderBuilder<Person>()
            .name("personReader")
            .resource(new FileSystemResource(inputFile))
            .delimited()
            .names("firstName", "lastName", "age", "email")
            .fieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {{ setTargetType(Person.class); }})
            .linesToSkip(1)
            .build();
    }

    @Bean
    public ItemProcessor<Person, Person> processor() {
        return person -> {
            if (person.getAge() < 18 || person.getAge() > 65) {
                return null;
            }
            if (!isValidEmail(person.getEmail())) {
                return null;
            }
            person.setFirstName(person.getFirstName().toUpperCase());
            person.setLastName(person.getLastName().toUpperCase());
            return person;
        };
    }

    private boolean isValidEmail(String email) {
        return email != null && email.matches("^[A-Za-z0-9+_.-]+@(.+)$");
    }

    @Bean
    public JdbcBatchItemWriter<Person> writer(DataSource dataSource) {
        return new JdbcBatchItemWriterBuilder<Person>()
            .itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
            .sql("INSERT INTO processed_persons (first_name, last_name, age, email) VALUES (:firstName, :lastName, :age, :email)")
            .dataSource(dataSource)
            .build();
    }

    @Bean
    public Step dataFilteringStep(JdbcBatchItemWriter<Person> writer) {
        return stepBuilderFactory.get("dataFilteringStep")
            .<Person, Person>chunk(5000)
            .reader(reader(null))
            .processor(processor())
            .writer(writer)
            .faultTolerant()
            .skipLimit(1000)
            .skip(Exception.class)
            .retryLimit(3)
            .retry(DataAccessException.class)
            .taskExecutor(virtualThreadTaskExecutor())
            .throttleLimit(50)
            .build();
    }

    @Bean
    public Job bigDataFilteringJob(Step dataFilteringStep) {
        return jobBuilderFactory.get("bigDataFilteringJob")
            .incrementer(new RunIdIncrementer())
            .start(dataFilteringStep)
            .validator(parametersValidator())
            .listener(jobExecutionListener())
            .build();
    }

    @Bean
    public JobExecutionListener jobExecutionListener() {
        return new JobExecutionListener() {
            @Override
            public void beforeJob(JobExecution jobExecution) {
                System.out.println("Starting large‑scale data filtering job");
            }
            @Override
            public void afterJob(JobExecution jobExecution) {
                System.out.println("Data filtering job completed, status: " + jobExecution.getStatus());
            }
        };
    }
}

Application Configuration

spring:
  datasource:
    url: jdbc:mysql://localhost:3306/batch_db
    username: batch_user
    password: batch_password
  batch:
    job:
      enabled: false
    jdbc:
      initialize-schema: always
  virtual-thread: true
logging:
  level:
    org.springframework.batch: INFO
    com.example.batch: DEBUG

Performance Listener Example

@Component
public class PerformanceListener implements ItemProcessListener<InputData, OutputData> {
    private final ThreadLocal<Long> startTime = new ThreadLocal<>();

    @Override
    public void beforeProcess(InputData item) {
        startTime.set(System.currentTimeMillis());
    }

    @Override
    public void afterProcess(InputData item, OutputData result) {
        long duration = System.currentTimeMillis() - startTime.get();
        metricsService.recordProcessingTime(duration);
    }
}
Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

JavaBig Datavirtual-threadsasyncpartitioningSpring BatchChunk Processing
Senior Xiao Ying
Written by

Senior Xiao Ying

Dedicated to sharing Java backend technical experience and original tutorials, offering career transition advice and resume editing. Recognized as a rising star in CSDN's Java backend community and ranked Top 3 in the 2022 New Star Program for Java backend.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.