Getting Started with the XXL‑JOB Distributed Task Scheduling Framework

This article provides a comprehensive guide to the XXL‑JOB distributed task scheduling framework, covering its architecture, server and executor deployment, task creation via annotations, API and sharding, execution steps, logging, and the underlying Netty‑based communication design with code examples.

Code Ape Tech Column
Code Ape Tech Column
Code Ape Tech Column
Getting Started with the XXL‑JOB Distributed Task Scheduling Framework

Project Introduction

XXL‑JOB is a lightweight open‑source distributed scheduling framework that consists of a management console and executor nodes. The console handles task configuration and log viewing, while executors run the actual job logic after connecting to the console.

Server Deployment

Clone the project from https://github.com/xuxueli/xxl-job, import the SQL script doc/db/table_xxl_job.sql into MySQL, and create a new Spring Boot project copying the xxl-job-admin sources and pom.xml. Adjust the database connection in application.properties and modify logback.xml for log paths. Start the application and access http://localhost:8080/xxl-job-admin/ (default admin: admin/123456).

Executor Deployment

Create a new module similar to the server, add the xxl-job-core dependency, and configure logback.xml. Two separate configuration files (e.g., application-9998.properties and application-9999.properties) illustrate how different executor ports can be registered for high‑availability load balancing.

Task Development

3.1 Annotation‑Based Tasks

Define a Java class and annotate methods with @XxlJob to turn them into scheduled jobs.

3.2 API‑Based Tasks

Use the provided API to submit jobs programmatically.

3.3 Sharding/Broadcast Tasks

Configure sharding parameters so that each executor processes a distinct slice of the workload, similar to Kafka consumer groups.

Task Execution

4.1 Single Task Execution

Create a job with a round‑robin routing strategy, specify a cron expression, and set the handler name (e.g., myJobAnnotationHandler) which corresponds to a Spring bean.

4.2 Sub‑Task Execution

Update a job to include sub‑task IDs (comma‑separated) and trigger execution from the console.

4.3 Sharding/Broadcast Execution

When a sharding job runs, each registered executor receives the task, enabling parallel processing of the same logical job.

Task Logging

XXL‑JOB records detailed execution logs, including parent‑child relationships, parameters, and console output generated by XxlJobLogger.

Communication Layer

The framework uses Netty HTTP for communication (though Mina, Jetty, and Netty TCP are also supported). The communication flow involves the scheduler invoking the executor via a dynamic proxy.

Design Highlights

1. Dynamic Proxy Pattern

Interfaces ExecutorBiz and AdminBiz define remote operations; XxlRpcReferenceBean creates proxy instances that handle the actual network calls.

2. Full Asynchronous Processing

Executors deserialize incoming requests and place them into a LinkedBlockingQueue. Worker threads consume the queue and execute jobs, while results are sent back asynchronously, reducing Netty worker thread load and increasing throughput.

3. Synchronous Wrapper for Asynchronous Calls

Scheduler code appears synchronous, but under the hood it waits for a Future response. Example of the synchronous wrapper:

public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address){
    ReturnT<String> runResult = null;
    try{
        ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);
        // many async steps, finally get sync result
        runResult = executorBiz.run(triggerParam);
    } catch (Exception e) {
        logger.error(">>>>>>>>>>>> xxl-job trigger error, please check if the executor[{}] is running.", address, e);
        runResult = new ReturnT<String>(ReturnT.FAIL_CODE, ThrowableUtil.toString(e));
    }
    StringBuffer runResultSB = new StringBuffer(I18nUtil.getString("jobconf_trigger_run") + ":");
    runResultSB.append("<br>address:").append(address);
    runResultSB.append("<br>code:").append(runResult.getCode());
    runResultSB.append("<br>msg:").append(runResult.getMsg());
    runResult.setMsg(runResultSB.toString());
    return runResult;
}

Dynamic Proxy Invocation

if (CallType.SYNC == callType) {
    XxlRpcFutureResponse futureResponse = new XxlRpcFutureResponse(invokerFactory, xxlRpcRequest, null);
    try {
        client.asyncSend(finalAddress, xxlRpcRequest);
        XxlRpcResponse xxlRpcResponse = futureResponse.get(timeout, TimeUnit.MILLISECONDS);
        if (xxlRpcResponse.getErrorMsg() != null) {
            throw new XxlRpcException(xxlRpcResponse.getErrorMsg());
        }
        return xxlRpcResponse.getResult();
    } catch (Exception e) {
        logger.info(">>>>>>>>>>>> xxl-rpc, invoke error, address:{}, XxlRpcRequest{}", finalAddress, xxlRpcRequest);
        throw (e instanceof XxlRpcException) ? e : new XxlRpcException(e);
    } finally {
        futureResponse.removeInvokerFuture();
    }
}

Future Response Handling

public void setResponse(XxlRpcResponse response) {
    this.response = response;
    synchronized (lock) {
        done = true;
        lock.notifyAll();
    }
}

@Override
public XxlRpcResponse get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
    if (!done) {
        synchronized (lock) {
            try {
                if (timeout < 0) {
                    lock.wait();
                } else {
                    long timeoutMillis = (TimeUnit.MILLISECONDS == unit) ? timeout : TimeUnit.MILLISECONDS.convert(timeout, unit);
                    lock.wait(timeoutMillis);
                }
            } catch (InterruptedException e) {
                throw e;
            }
        }
    }
    if (!done) {
        throw new XxlRpcException("xxl-rpc, request timeout at:" + System.currentTimeMillis() + ", request:" + request.toString());
    }
    return response;
}

Notifier for Completed Calls

public void notifyInvokerFuture(String requestId, final XxlRpcResponse xxlRpcResponse) {
    final XxlRpcFutureResponse futureResponse = futureResponsePool.get(requestId);
    if (futureResponse == null) {
        return;
    }
    if (futureResponse.getInvokeCallback() != null) {
        try {
            executeResponseCallback(new Runnable() {
                @Override
                public void run() {
                    if (xxlRpcResponse.getErrorMsg() != null) {
                        futureResponse.getInvokeCallback().onFailure(new XxlRpcException(xxlRpcResponse.getErrorMsg()));
                    } else {
                        futureResponse.getInvokeCallback().onSuccess(xxlRpcResponse.getResult());
                    }
                }
            });
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        }
    } else {
        futureResponse.setResponse(xxlRpcResponse);
    }
    futureResponsePool.remove(requestId);
}

Each remote call carries a UUID request ID, which acts as a key to locate the corresponding XxlRpcFutureResponse and wake the waiting thread.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

Code Ape Tech Column
Written by

Code Ape Tech Column

Former Ant Group P8 engineer, pure technologist, sharing full‑stack Java, job interview and career advice through a column. Site: java-family.cn

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.