Getting Started with the XXL‑JOB Distributed Task Scheduling Framework
This article provides a comprehensive guide to the XXL‑JOB distributed task scheduling framework, covering its architecture, server and executor deployment, task creation via annotations, API and sharding, execution steps, logging, and the underlying Netty‑based communication design with code examples.
Project Introduction
XXL‑JOB is a lightweight open‑source distributed scheduling framework that consists of a management console and executor nodes. The console handles task configuration and log viewing, while executors run the actual job logic after connecting to the console.
Server Deployment
Clone the project from https://github.com/xuxueli/xxl-job, import the SQL script doc/db/table_xxl_job.sql into MySQL, and create a new Spring Boot project copying the xxl-job-admin sources and pom.xml. Adjust the database connection in application.properties and modify logback.xml for log paths. Start the application and access http://localhost:8080/xxl-job-admin/ (default admin: admin/123456).
Executor Deployment
Create a new module similar to the server, add the xxl-job-core dependency, and configure logback.xml. Two separate configuration files (e.g., application-9998.properties and application-9999.properties) illustrate how different executor ports can be registered for high‑availability load balancing.
Task Development
3.1 Annotation‑Based Tasks
Define a Java class and annotate methods with @XxlJob to turn them into scheduled jobs.
3.2 API‑Based Tasks
Use the provided API to submit jobs programmatically.
3.3 Sharding/Broadcast Tasks
Configure sharding parameters so that each executor processes a distinct slice of the workload, similar to Kafka consumer groups.
Task Execution
4.1 Single Task Execution
Create a job with a round‑robin routing strategy, specify a cron expression, and set the handler name (e.g., myJobAnnotationHandler) which corresponds to a Spring bean.
4.2 Sub‑Task Execution
Update a job to include sub‑task IDs (comma‑separated) and trigger execution from the console.
4.3 Sharding/Broadcast Execution
When a sharding job runs, each registered executor receives the task, enabling parallel processing of the same logical job.
Task Logging
XXL‑JOB records detailed execution logs, including parent‑child relationships, parameters, and console output generated by XxlJobLogger.
Communication Layer
The framework uses Netty HTTP for communication (though Mina, Jetty, and Netty TCP are also supported). The communication flow involves the scheduler invoking the executor via a dynamic proxy.
Design Highlights
1. Dynamic Proxy Pattern
Interfaces ExecutorBiz and AdminBiz define remote operations; XxlRpcReferenceBean creates proxy instances that handle the actual network calls.
2. Full Asynchronous Processing
Executors deserialize incoming requests and place them into a LinkedBlockingQueue. Worker threads consume the queue and execute jobs, while results are sent back asynchronously, reducing Netty worker thread load and increasing throughput.
3. Synchronous Wrapper for Asynchronous Calls
Scheduler code appears synchronous, but under the hood it waits for a Future response. Example of the synchronous wrapper:
public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address){
ReturnT<String> runResult = null;
try{
ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);
// many async steps, finally get sync result
runResult = executorBiz.run(triggerParam);
} catch (Exception e) {
logger.error(">>>>>>>>>>>> xxl-job trigger error, please check if the executor[{}] is running.", address, e);
runResult = new ReturnT<String>(ReturnT.FAIL_CODE, ThrowableUtil.toString(e));
}
StringBuffer runResultSB = new StringBuffer(I18nUtil.getString("jobconf_trigger_run") + ":");
runResultSB.append("<br>address:").append(address);
runResultSB.append("<br>code:").append(runResult.getCode());
runResultSB.append("<br>msg:").append(runResult.getMsg());
runResult.setMsg(runResultSB.toString());
return runResult;
}Dynamic Proxy Invocation
if (CallType.SYNC == callType) {
XxlRpcFutureResponse futureResponse = new XxlRpcFutureResponse(invokerFactory, xxlRpcRequest, null);
try {
client.asyncSend(finalAddress, xxlRpcRequest);
XxlRpcResponse xxlRpcResponse = futureResponse.get(timeout, TimeUnit.MILLISECONDS);
if (xxlRpcResponse.getErrorMsg() != null) {
throw new XxlRpcException(xxlRpcResponse.getErrorMsg());
}
return xxlRpcResponse.getResult();
} catch (Exception e) {
logger.info(">>>>>>>>>>>> xxl-rpc, invoke error, address:{}, XxlRpcRequest{}", finalAddress, xxlRpcRequest);
throw (e instanceof XxlRpcException) ? e : new XxlRpcException(e);
} finally {
futureResponse.removeInvokerFuture();
}
}Future Response Handling
public void setResponse(XxlRpcResponse response) {
this.response = response;
synchronized (lock) {
done = true;
lock.notifyAll();
}
}
@Override
public XxlRpcResponse get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
if (!done) {
synchronized (lock) {
try {
if (timeout < 0) {
lock.wait();
} else {
long timeoutMillis = (TimeUnit.MILLISECONDS == unit) ? timeout : TimeUnit.MILLISECONDS.convert(timeout, unit);
lock.wait(timeoutMillis);
}
} catch (InterruptedException e) {
throw e;
}
}
}
if (!done) {
throw new XxlRpcException("xxl-rpc, request timeout at:" + System.currentTimeMillis() + ", request:" + request.toString());
}
return response;
}Notifier for Completed Calls
public void notifyInvokerFuture(String requestId, final XxlRpcResponse xxlRpcResponse) {
final XxlRpcFutureResponse futureResponse = futureResponsePool.get(requestId);
if (futureResponse == null) {
return;
}
if (futureResponse.getInvokeCallback() != null) {
try {
executeResponseCallback(new Runnable() {
@Override
public void run() {
if (xxlRpcResponse.getErrorMsg() != null) {
futureResponse.getInvokeCallback().onFailure(new XxlRpcException(xxlRpcResponse.getErrorMsg()));
} else {
futureResponse.getInvokeCallback().onSuccess(xxlRpcResponse.getResult());
}
}
});
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
} else {
futureResponse.setResponse(xxlRpcResponse);
}
futureResponsePool.remove(requestId);
}Each remote call carries a UUID request ID, which acts as a key to locate the corresponding XxlRpcFutureResponse and wake the waiting thread.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Code Ape Tech Column
Former Ant Group P8 engineer, pure technologist, sharing full‑stack Java, job interview and career advice through a column. Site: java-family.cn
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
