Mastering Spring Batch: Real-World Use Cases and Hands‑On Guide
This comprehensive guide explains why batch processing is essential, walks through typical banking, e‑commerce, logging and medical data scenarios, details Spring Batch's core architecture and components, provides step‑by‑step setup and code examples, and presents a production‑grade bank reconciliation case with monitoring and troubleshooting tips.
Why Batch Processing?
1. Application Scenarios
Scenario 1: Daily Interest Calculation for Banks
Pain Point: Scanning millions of accounts at midnight manually leads to omissions.
Spring Batch Solution: Partitioned reads and batch interest calculation with automatic retries.
Actual Result: Processing time reduced from 4 hours to 23 minutes after system refactor.
Scenario 2: E‑Commerce Order Archiving
// Traditional SQL example (performance issue)
DELETE FROM active_orders
WHERE create_time < '2023-01-01'
LIMIT 5000; // Must loop until no data remainsProblem: Deleting millions of rows locks the table.
Correct Approach: Use Spring Batch to page‑read, write to a history table, then batch delete.
Scenario 3: Log Analysis
Typical Need: Analyze Nginx API response time distribution.
Special Challenge: Memory control when processing GB‑size text files.
Scenario 4: Medical Data Migration
Special Requirement: Legacy system must stay online during migration.
Solution: Spring Batch incremental migration mode.
Traditional Pain Points
Complex resource management.
Fault‑tolerance black holes.
Monitoring blind spots (cannot answer "how much is left?").
// Typical multithreading error example
ExecutorService executor = Executors.newFixedThreadPool(8);
try {
while (hasNextPage()) {
List<Data> page = fetchNextPage();
executor.submit(() -> processPage(page)); // May cause memory leak
}
} finally {
executor.shutdown(); // Forgetting this leads to thread pile‑up
}Common issue: thread‑pool misconfiguration causes OOM, database connection leaks.
// Pseudo‑code: fragile error handling
for (int i = 0; i < 3; i++) {
try {
processBatch();
break;
} catch (Exception e) {
if (i == 2) sendAlert(); // Simple retry cannot handle partial success
}
}Real case: a payment system duplicated payouts due to unhandled partial failures.
# Hard‑coded configuration example
batch.size=1000
input.path=/data/
output.path=/data/outProblem root: parameter changes require redeployment; environment configs get mixed.
Spring Batch Core Architecture
1. Four Core Components Deep Dive
Component 1: Job (Job Factory)
Core Role: Define a complete batch pipeline (e.g., monthly report generation).
Real Example: A bank's end‑of‑day reconciliation Job with three Steps.
@Bean
public Job reconciliationJob() {
return jobBuilderFactory.get("dailyReconciliation")
.start(downloadBankFileStep())
.next(validateDataStep())
.next(generateReportStep())
.build();
}Component 2: Step (Pipeline Assembly)
Design pattern: chunk‑oriented processing.
@Bean
public Step importStep() {
return stepBuilderFactory.get("csvImport")
.<User, User>chunk(500)
.reader(csvReader())
.processor(validationProcessor())
.writer(dbWriter())
.faultTolerant()
.skipLimit(10)
.skip(DataIntegrityViolationException.class)
.build();
}Component 3: ItemReader (Data Mover)
@Bean
public FlatFileItemReader<User> csvReader() {
return new FlatFileItemReaderBuilder<User>()
.name("userReader")
.resource(new FileSystemResource("data/users.csv"))
.delimited().delimiter(",")
.names("id","name","email")
.fieldSetMapper(new BeanWrapperFieldSetMapper<User>() {{ setTargetType(User.class); }})
.linesToSkip(1)
.build();
}Component 4: ItemWriter (Data Collector)
@Bean
public CompositeItemWriter<User> compositeWriter() {
return new CompositeItemWriterBuilder<User>()
.delegates(dbWriter(), logWriter(), mqWriter())
.build();
}
@Bean
public JdbcBatchItemWriter<User> dbWriter() {
return new JdbcBatchItemWriterBuilder<User>()
.dataSource(dataSource)
.sql("INSERT INTO users (name,email) VALUES (:name,:email)")
.beanMapped()
.build();
}Step‑by‑Step Development Guide
1. Environment Setup
<!-- Complete POM configuration -->
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.1.5</version>
</parent>
<dependencies>
<!-- Batch core dependency -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<!-- In‑memory DB (replace with MySQL in prod) -->
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>runtime</scope>
</dependency>
<!-- Lombok for boilerplate reduction -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
</dependencies> # application.properties
spring.batch.jdbc.initialize-schema=always
spring.datasource.url=jdbc:h2:mem:testdb
spring.datasource.driverClassName=org.h2.Driver2. First Batch Job
Domain model class:
@Data
@NoArgsConstructor
@AllArgsConstructor
public class User {
private String name;
private int age;
private String email;
}Complete Job configuration:
@Configuration
@EnableBatchProcessing
public class BatchConfig {
@Autowired private JobBuilderFactory jobBuilderFactory;
@Autowired private StepBuilderFactory stepBuilderFactory;
@Bean
public Job importUserJob() {
return jobBuilderFactory.get("importUserJob")
.start(csvProcessingStep())
.build();
}
@Bean
public Step csvProcessingStep() {
return stepBuilderFactory.get("csvProcessing")
.<User, User>chunk(100)
.reader(userReader())
.processor(userProcessor())
.writer(userWriter())
.build();
}
@Bean
public FlatFileItemReader<User> userReader() {
return new FlatFileItemReaderBuilder<User>()
.name("userReader")
.resource(new ClassPathResource("users.csv"))
.delimited().delimiter(",")
.names("name","age","email")
.targetType(User.class)
.linesToSkip(1)
.build();
}
@Bean
public ItemProcessor<User, User> userProcessor() {
return user -> {
if (user.getAge() < 0) {
throw new IllegalArgumentException("Age cannot be negative: " + user);
}
return user.toBuilder()
.email(user.getEmail().toLowerCase())
.build();
};
}
@Bean
public JdbcBatchItemWriter<User> userWriter(DataSource dataSource) {
return new JdbcBatchItemWriterBuilder<User>()
.dataSource(dataSource)
.sql("INSERT INTO users (name, age, email) VALUES (:name, :age, :email)")
.beanMapped()
.build();
}
}Sample CSV (src/main/resources/users.csv):
name,age,email
张三,25,[email protected]
李四,30,[email protected]
王五,-5,[email protected]3. Execution Visualization
4. Run Result Verification
Console output shows job launch, step execution, and validation error for the negative age record.
2023-10-01 10:00:00 INFO o.s.b.c.l.support.SimpleJobLauncher - Job: [SimpleJob: [name=importUserJob]] launched
2023-10-01 10:00:05 INFO o.s.batch.core.job.SimpleStepHandler - Executing step: [csvProcessing]
2023-10-01 10:00:15 ERROR o.s.batch.core.step.AbstractStep - Encountered an error executing step csvProcessing
org.springframework.batch.item.validator.ValidationException: Age cannot be negative: User(name=王五, age=-5, [email protected])Database query confirms only valid rows were inserted.
SELECT * FROM users;Real‑World Case: Bank Transaction Reconciliation
1. Scenario Requirements
Dual data source (file + database).
Millions of records need efficient comparison.
Differences must be quickly persisted.
Solution must run in a distributed environment.
2. Full Architecture Design
3. Domain Model Definition
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Transaction {
// Common fields
private String transactionId;
private LocalDateTime tradeTime;
private BigDecimal amount;
// Bank side
private String bankSerialNo;
private BigDecimal bankAmount;
// Internal system side
private String internalOrderNo;
private BigDecimal systemAmount;
// Reconciliation result
private ReconStatus status;
private String discrepancyType;
}
public enum ReconStatus {
MATCHED, // Data matches
AMOUNT_DIFF, // Amount mismatch
STATUS_DIFF, // Status mismatch
ONLY_IN_BANK, // Bank‑only record
ONLY_IN_SYSTEM // System‑only record
}4. Complete Job Configuration
@Configuration
@EnableBatchProcessing
public class BankReconJobConfig {
@Bean
public Job bankReconciliationJob(Step downloadStep, Step reconStep, Step reportStep) {
return jobBuilderFactory.get("bankReconciliationJob")
.start(downloadStep)
.next(reconStep)
.next(reportStep)
.build();
}
@Bean
public Step downloadStep() {
return stepBuilderFactory.get("downloadStep")
.tasklet((contribution, chunkContext) -> {
sftpService.download("/bank/recon/20231001.csv");
return RepeatStatus.FINISHED;
})
.build();
}
@Bean
public Step reconStep() {
return stepBuilderFactory.get("reconStep")
.<Transaction, Transaction>chunk(1000)
.reader(compositeReader())
.processor(compositeProcessor())
.writer(compositeWriter())
.faultTolerant()
.skipLimit(100)
.skip(DataIntegrityViolationException.class)
.retryLimit(3)
.retry(DeadlockLoserDataAccessException.class)
.build();
}
@Bean
public CompositeItemReader<Transaction> compositeReader() {
return new CompositeItemReaderBuilder<Transaction>()
.delegates(bankFileReader(), internalDbReader())
.build();
}
@Bean
public FlatFileItemReader<Transaction> bankFileReader() {
return new FlatFileItemReaderBuilder<Transaction>()
.name("bankFileReader")
.resource(new FileSystemResource("recon/20231001.csv"))
.delimited()
.names("transactionId","tradeTime","amount","bankSerialNo")
.fieldSetMapper(fieldSet -> {
Transaction t = new Transaction();
t.setTransactionId(fieldSet.readString("transactionId"));
t.setBankSerialNo(fieldSet.readString("bankSerialNo"));
t.setBankAmount(fieldSet.readBigDecimal("amount"));
return t;
})
.build();
}
@Bean
public JdbcCursorItemReader<Transaction> internalDbReader() {
return new JdbcCursorItemReaderBuilder<Transaction>()
.name("internalDbReader")
.dataSource(internalDataSource)
.sql("SELECT order_no, amount, status FROM transactions WHERE trade_date = ?")
.rowMapper((rs, rowNum) -> {
Transaction t = new Transaction();
t.setInternalOrderNo(rs.getString("order_no"));
t.setSystemAmount(rs.getBigDecimal("amount"));
return t;
})
.preparedStatementSetter(ps -> ps.setString(1, "2023-10-01"))
.build();
}
@Bean
public CompositeItemProcessor<Transaction> compositeProcessor() {
List<ItemProcessor<?, ?>> delegates = new ArrayList<>();
delegates.add(new DataMatchingProcessor());
delegates.add(new DiscrepancyClassifier());
return new CompositeItemProcessorBuilder<>()
.delegates(delegates)
.build();
}
@Bean
public CompositeItemWriter<Transaction> compositeWriter() {
return new CompositeItemWriterBuilder<Transaction>()
.delegates(discrepancyDbWriter(), alertMessageWriter())
.build();
}
}5. Core Processor Implementations
public class DataMatchingProcessor implements ItemProcessor<Transaction, Transaction> {
@Override
public Transaction process(Transaction item) {
if (item.getBankSerialNo() == null) {
item.setStatus(ReconStatus.ONLY_IN_SYSTEM);
} else if (item.getInternalOrderNo() == null) {
item.setStatus(ReconStatus.ONLY_IN_BANK);
} else {
compareAmounts(item);
compareStatuses(item);
}
return item;
}
private void compareAmounts(Transaction t) {
if (t.getBankAmount().compareTo(t.getSystemAmount()) != 0) {
t.setDiscrepancyType("AMOUNT_MISMATCH");
t.setStatus(ReconStatus.AMOUNT_DIFF);
t.setAmount(t.getBankAmount().subtract(t.getSystemAmount()).abs());
}
}
private void compareStatuses(Transaction t) {
String internalStatus = transactionService.getStatus(t.getInternalOrderNo());
if (!"SETTLED".equals(internalStatus)) {
t.setDiscrepancyType("STATUS_MISMATCH");
t.setStatus(ReconStatus.STATUS_DIFF);
}
}
}
public class DiscrepancyClassifier implements ItemProcessor<Transaction, Transaction> {
@Override
public Transaction process(Transaction item) {
if (item.getStatus() != ReconStatus.MATCHED) {
item.setAlertLevel(calculateAlertLevel(item));
}
return item;
}
private AlertLevel calculateAlertLevel(Transaction t) {
if (t.getAmount().compareTo(new BigDecimal("1000000")) > 0) {
return AlertLevel.CRITICAL;
}
return AlertLevel.WARNING;
}
}6. Difference Report Generation Step
@Bean
public Step reportStep() {
return stepBuilderFactory.get("reportStep")
.<Transaction, Transaction>chunk(1000)
.reader(discrepancyReader())
.writer(excelWriter())
.build();
}
@Bean
public JdbcPagingItemReader<Transaction> discrepancyReader() {
return new JdbcPagingItemReaderBuilder<Transaction>()
.name("discrepancyReader")
.dataSource(reconDataSource)
.selectClause("SELECT *")
.fromClause("FROM discrepancy_records")
.whereClause("WHERE recon_date = '2023-10-01'")
.sortKeys(Collections.singletonMap("transaction_id", Order.ASCENDING))
.rowMapper(new BeanPropertyRowMapper<>(Transaction.class))
.build();
}
@Bean
public ExcelFileItemWriter<Transaction> excelWriter() {
return new ExcelFileItemWriterBuilder<Transaction>()
.name("excelWriter")
.resource(new FileSystemResource("reports/2023-10-01.xlsx"))
.sheetName("差异报告")
.headers(new String[]{"交易ID","差异类型","金额差异","告警级别"})
.fieldExtractor(item -> new Object[]{
item.getTransactionId(),
item.getDiscrepancyType(),
item.getAmount(),
item.getAlertLevel()
})
.build();
}7. Performance Optimizations
# Application configuration
spring.batch.job.enabled=false # Disable auto‑start
spring.batch.initialize-schema=never
spring.batch.chunk.size=2000 # Adjust based on memory
spring.datasource.hikari.maximum-pool-size=20
spring.jpa.properties.hibernate.jdbc.batch_size=1000Production‑Grade Features
1. Fault‑Tolerance Mechanisms
@Bean
public Step secureStep() {
return stepBuilderFactory.get("secureStep")
.<Input, Output>chunk(500)
.reader(jdbcReader())
.processor(secureProcessor())
.writer(restApiWriter())
.faultTolerant()
.retryLimit(3)
.retry(ConnectException.class)
.retry(DeadlockLoserDataAccessException.class)
.skipLimit(100)
.skip(DataIntegrityViolationException.class)
.skip(InvalidDataAccessApiUsageException.class)
.noRollback(ValidationException.class)
.listener(new ErrorLogListener())
.build();
}2. Performance Strategies for Tens of Millions of Records
Strategy 1: Parallel Step Execution
@Bean
public Job parallelJob() {
return jobBuilderFactory.get("parallelJob")
.start(step1())
.split(new SimpleAsyncTaskExecutor())
.add(step2(), step3())
.build();
}Strategy 2: Partitioning
@Bean
public Step masterStep() {
return stepBuilderFactory.get("masterStep")
.partitioner("slaveStep", partitioner())
.gridSize(10)
.taskExecutor(new ThreadPoolTaskExecutor())
.build();
}
@Bean
public Partitioner partitioner() {
return gridSize -> {
Map<String, ExecutionContext> result = new HashMap<>();
long total = getTotalRecordCount();
long range = total / gridSize;
for (int i = 0; i < gridSize; i++) {
ExecutionContext ctx = new ExecutionContext();
ctx.putLong("min", i * range);
ctx.putLong("max", (i + 1) * range);
result.put("partition" + i, ctx);
}
return result;
};
}Strategy 3: Asynchronous ItemProcessor
@Bean
public Step asyncStep() {
return stepBuilderFactory.get("asyncStep")
.<Input, Output>chunk(1000)
.reader(reader())
.processor(asyncItemProcessor())
.writer(writer())
.build();
}
@Bean
public AsyncItemProcessor<Input, Output> asyncItemProcessor() {
AsyncItemProcessor<Input, Output> async = new AsyncItemProcessor<>();
async.setDelegate(syncProcessor());
async.setTaskExecutor(new ThreadPoolTaskExecutor());
return async;
}
@Bean
public AsyncItemWriter<Output> asyncItemWriter() {
AsyncItemWriter<Output> async = new AsyncItemWriter<>();
async.setDelegate(syncWriter());
return async;
}3. Performance Test Results
Monitoring & Management (Production‑Level)
1. Monitoring Stack Upgrade (Spring Batch Admin Alternative)
// Add Micrometer Prometheus dependency
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-prometheus</artifactId>
</dependency>
@Bean
public MeterRegistryCustomizer<MeterRegistry> metricsCommonTags() {
return registry -> registry.config().commonTags("application", "batch-service");
}
public class BatchMetricsListener extends JobExecutionListenerSupport {
private final Counter processedRecords = Counter.builder("batch.records.processed")
.description("Total processed records")
.register(Metrics.globalRegistry);
@Override
public void afterStep(StepExecution stepExecution) {
processedRecords.increment(stepExecution.getWriteCount());
}
}2. Metadata Table Structure Details
Key tables: BATCH_JOB_INSTANCE: Stores unique job fingerprints; same parameters map to one instance. BATCH_JOB_EXECUTION_PARAMS: Persists parameters for each run. BATCH_STEP_EXECUTION_CONTEXT: Holds step context data, crucial for restart recovery.
3. Custom Monitoring Dashboard
-- Recent 5 job executions
SELECT j.JOB_NAME, e.START_TIME, e.END_TIME,
TIMEDIFF(e.END_TIME, e.START_TIME) AS DURATION,
s.READ_COUNT, s.WRITE_COUNT
FROM BATCH_JOB_EXECUTION e
JOIN BATCH_JOB_INSTANCE j ON e.JOB_INSTANCE_ID = j.JOB_INSTANCE_ID
JOIN BATCH_STEP_EXECUTION s ON e.JOB_EXECUTION_ID = s.JOB_EXECUTION_ID
ORDER BY e.START_TIME DESC
LIMIT 5;FAQ – Ultimate Guide
1. Memory OOM When Processing 10GB CSV
Scenario: OOM occurs while reading a massive CSV file.
@Bean
@StepScope
public FlatFileItemReader<LargeRecord> largeFileReader(@Value("#{jobParameters['filePath']}") String filePath) {
return new FlatFileItemReaderBuilder<LargeRecord>()
.resource(new FileSystemResource(filePath))
.lineMapper(new DefaultLineMapper<>() {{
setLineTokenizer(new DelimitedLineTokenizer());
setFieldSetMapper(new BeanWrapperFieldSetMapper<>() {{ setTargetType(LargeRecord.class); }});
}})
.linesToSkip(1)
.strict(false) // Allow empty trailing lines
.saveState(false) // Disable state persistence
.build();
}
// Recommended JVM options
// -XX:+UseG1GC -Xmx4g -XX:MaxGCPauseMillis=2002. Advanced Scheduling Configuration
@Configuration
@EnableScheduling
public class ScheduleConfig {
@Autowired private JobLauncher jobLauncher;
@Autowired private Job reportJob;
// Run on weekdays at 02:00
@Scheduled(cron = "0 0 2 * * MON-FRI")
public void dailyJob() throws Exception {
JobParameters params = new JobParametersBuilder()
.addString("date", LocalDate.now().toString())
.toJobParameters();
jobLauncher.run(reportJob, params);
}
// Hourly polling example
@Scheduled(fixedRate = 3600000)
public void pollJob() {
if (checkNewDataExists()) {
jobLauncher.run(dataProcessJob, new JobParameters());
}
}
// Graceful stop
public void stopJob(Long executionId) {
JobExecution execution = jobExplorer.getJobExecution(executionId);
if (execution.isRunning()) {
execution.setStatus(BatchStatus.STOPPING);
jobRepository.update(execution);
}
}
}3. Common Issues
Q: How to rerun a failed job?
-- Step 1: Find failed execution IDs
SELECT * FROM BATCH_JOB_EXECUTION WHERE STATUS = 'FAILED';
-- Step 2: Relaunch with same parameters
JobParameters params = new JobParametersBuilder()
.addLong("restartId", originalExecutionId)
.toJobParameters();
jobLauncher.run(job, params);Q: What if power loss occurs during processing?
Spring Batch stores step execution context in BATCH_STEP_EXECUTION_CONTEXT, allowing job restart from the last committed chunk.
Q: How to pass dynamic parameters?
// Command line
java -jar batch.jar --spring.batch.job.name=dataImportJob date=2023-10-01
// Programmatic launch
JobParameters params = new JobParametersBuilder()
.addString("mode", "forceUpdate")
.addLong("timestamp", System.currentTimeMillis())
.toJobParameters();
jobLauncher.run(importJob, params);4. Performance Tuning Checklist
Database Optimizations
Add batch‑processing indexes.
Configure connection pool size appropriately.
Enable JDBC batch mode.
JVM Optimizations
-XX:+UseStringDeduplication
-XX:+UseCompressedOops
-XX:MaxMetaspaceSize=512mBatch Settings
spring.batch.jdbc.initialize-schema=never
spring.batch.job.enabled=false
spring.jpa.open-in-view=falseAdditional Non‑Technical Announcement
Enterprise‑level practical summary "40 Lectures" covering JVM, database, performance tuning and other core backend issues is now on sale. Original price 99 CNY, discounted to 11.9 CNY for a lifetime purchase. Scan the QR code below to subscribe.
Article directory can be viewed by scanning the QR code.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Code Ape Tech Column
Former Ant Group P8 engineer, pure technologist, sharing full‑stack Java, job interview and career advice through a column. Site: java-family.cn
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
