Backend Development 18 min read

Evolution of a Batch Processing System: From Centralized to Configurable and Localized Architectures

This article examines the evolution of a merchant batch processing system, detailing its transition from a centralized, tightly‑coupled architecture to a configurable, SPI‑based design and finally to a localized task‑reporting model, while discussing code reuse, scheduling strategies, isolation techniques, and performance challenges.

Wukong Talks Architecture
Wukong Talks Architecture
Wukong Talks Architecture
Evolution of a Batch Processing System: From Centralized to Configurable and Localized Architectures

The article begins by describing the business need for batch operations in e‑commerce platforms, where a small percentage of high‑value merchants generate the majority of sales and require efficient bulk handling of products, inventory, and orders.

It then outlines three architectural stages of the batch processing system from the perspective of a developer named Xiao Wang: centralized (process extension), platform‑based (configuration registration), and localized (task reporting).

Centralized: Process Extension – Xiao Wang initially implements similar batch workflows for pricing and other operations, discovers repetitive steps, and refactors common logic into reusable code using a factory pattern. The following code shows the handler factory:

@Component
public class BpcProcessHandlerFactory {
    @Autowired
    private ApplicationContext applicationContext;
    private static ConcurrentHashMap
templateMap = new ConcurrentHashMap<>();

    @PostConstruct
    private void init() {
        Map
importServiceMap = applicationContext.getBeansOfType(ImportService.class);
        for (ImportService importService : importServiceMap.values()) {
            initImportService(importService);
        }
    }

    private void initImportService(ImportService importService) {
        // ...
    }

    public BpcProcessHandler getBpcProcessHandler(String templateCode) {
        if (StringUtils.isBlank(templateCode) || !templateMap.containsKey(templateCode)) {
            return null;
        }
        return templateMap.get(templateCode).newProcessHandler();
    }
}

The corresponding service that executes a batch process uses the factory to obtain a handler and runs a loop over data:

@Service
public class BpcProcessService {
    @Autowired
    private BpcProcessHandlerFactory bpcProcessHandlerFactory;

    public String doBpcProcess(BpcProcessReq req) throws BpcProcessException {
        BpcProcessHandler bpcProcessHandler = bpcProcessHandlerFactory.getBpcProcessHandler(req.getTaskTemplateCode());
        if (bpcProcessHandler == null) {
            throw new BpcProcessException("Template not found");
        }
        createTask();
        downloadFromOss();
        int loopCnt = 0;
        int maxLoopCnt = bpcProcessHandler.getMaxLoopCnt();
        while (loopCnt++ < maxLoopCnt) {
            bpcProcessHandler.process();
            updateTaskProcess();
        }
        updateTaskStatus();
        return taskId;
    }
}

Platform‑Based: Configuration Registration – To reduce code duplication, Xiao Wang introduces a configuration‑driven approach where business‑specific logic is provided via SPI extensions. The configuration includes Excel format, SPI information, and field mappings. A generic Dubbo invocation helper demonstrates how the system calls the configured SPI:

@Override
public String invoke(ServiceDefinition serviceDefinition, Object inputParam) {
    GenericService genericService = DubboConfig.buildService(serviceDefinition.getInterfaceName(), serviceDefinition.getTimeout());
    String[] parameterTypes = new String[] {serviceDefinition.getRequestType().getClassName()};
    Object[] args = new Object[] {inputParam};
    long startTime = System.currentTimeMillis();
    Object result;
    try {
        log.info("invoke service={}#{} with request={}", serviceDefinition.getInterfaceName(), serviceDefinition.getMethod(), JSON.toJSONString(args));
        result = genericService.$invoke(serviceDefinition.getMethod(), parameterTypes, args);
        long endTime = System.currentTimeMillis();
        digestLog(serviceDefinition, true, endTime - startTime);
        log.info("invoke service={}#{} with result={}", serviceDefinition.getInterfaceName(), serviceDefinition.getMethod(), JSON.toJSONString(result));
    } catch (Exception ex) {
        long endTime = System.currentTimeMillis();
        digestLog(serviceDefinition, false, endTime - startTime);
        log.info("failed to dubbo invoke: " + serviceDefinition.getInterfaceName() + "#" + serviceDefinition.getMethod() + " with error " + ex.getMessage());
        throw new DependencyException(ErrorCodeEnum.DEFAULT_DEPENDENCY_ERROR.getCode(), ex.getMessage(), ex);
    }
    if (result == null) {
        throw new DependencyException(ErrorCodeEnum.DEFAULT_BIZ_ERROR.getCode(), "the result is null");
    }
    Map resultMap = JSON.parseObject(JSON.toJSONString(result), Map.class);
    processError(resultMap);
    Object data = resultMap.get("data");
    return JSON.toJSONString(data);
}

The simplified execution flow now fetches the template, creates a task, downloads files, parses data, and invokes the SPI within a loop, mirroring the earlier extension‑based logic but driven by configuration.

Localized: Task Reporting – In the final stage, the batch system becomes a thin presentation layer. Business services perform file parsing (EasyExcel), upload (OSS SDK), and report task status to the batch center, which only displays results. This reduces coupling and improves resource utilization.

Throughout the evolution, Xiao Wang encounters performance issues such as memory pressure and OOM during peak loads. Simple rate‑limiting proves insufficient, leading to the adoption of asynchronous scheduling with priority queues, aging strategies, and multi‑level queues. The chosen solution combines multi‑level queues with thread‑pool isolation, implemented as follows:

@Service
@Slf4j
public class TaskScheduleServiceImpl implements TaskScheduleService {
    @Override
    @LogAnnotation
    public void schedule(int shared, int all) {
        StopWatch stopWatch = new StopWatch();
        stopWatch.start();
        List
highTaskIds = taskInstanceRepository.queryUnstartedTaskIdByPriority(TaskPriorityEnum.HIGH, all * arkConfig.highSize);
        highTaskIds = highTaskIds.stream().filter(id -> id % all == shared).collect(Collectors.toList());
        log.info("High priority task IDs = {}", highTaskIds);
        process(highTaskIds, id -> taskThreadPool.executeHigh(() -> process(id)));
        // medium and low priority similar
        log.info("Scheduling completed, cost = {}", stopWatch.getTime());
    }

    private void process(List
idList, Consumer
consumer) {
        if (CollectionUtils.isEmpty(idList)) return;
        for (Long id : idList) {
            consumer.accept(id);
        }
    }

    private void process(Long id) {
        // task handling logic ...
    }
}

Additional refinements include task sharding to limit per‑machine load and environment‑aware routing for testing (coloring). The article concludes with a summary that the batch system’s architecture has iterated from tightly coupled extensions to configurable SPI and finally to a lightweight reporting model, each step driven by scalability, maintainability, and isolation requirements.

Javabackend developmentBatch ProcessingConfigurationsystem designtask scheduling
Wukong Talks Architecture
Written by

Wukong Talks Architecture

Explaining distributed systems and architecture through stories. Author of the "JVM Performance Tuning in Practice" column, open-source author of "Spring Cloud in Practice PassJava", and independently developed a PMP practice quiz mini-program.

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.