Stunning Architecture Designs Inside XXL-JOB Explained
The article dissects XXL-JOB's communication layer, showcasing Netty HTTP usage, a full asynchronous processing pipeline, dynamic proxy abstraction, request‑ID based thread wake‑up, and detailed Java code examples that illustrate how the scheduler and executor achieve high‑throughput remote job execution.
Communication Layer Overview
XXL‑JOB uses Netty HTTP for communication; although Mina, Jetty, and Netty TCP are supported, the implementation hard‑codes Netty HTTP.
Overall Communication Flow
When the scheduler notifies an executor to run a task, the activity diagram shows the sequence of messages.
Key Design Points
Dynamic proxy hides communication details
XXL‑JOB defines two interfaces ExecutorBiz and AdminBiz. They encapsulate heartbeat, pause, trigger, callback, registration, and deregistration operations. Their implementation classes contain no communication logic. XxlRpcReferenceBean creates a proxy via its getObject() method; the proxy performs remote calls.
Fully asynchronous processing
Upon receiving a message, the executor deserializes it and stores the task in a LinkedBlockingQueue. Worker threads pull tasks from the queue and execute them. The result is placed into a callback thread’s blocking queue, allowing asynchronous return and reducing Netty worker thread time, thereby increasing throughput.
Asynchronous wrapper appears synchronous
The scheduler’s XxlJobTrigger.runExecutor method invokes ExecutorBiz.run through the dynamic proxy. Although the underlying processing is asynchronous, the method blocks until the result is returned, then constructs a ReturnT<String> with address, code, and message.
public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address) {
ReturnT<String> runResult = null;
try {
ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);
// many async steps, finally synchronous result
runResult = executorBiz.run(triggerParam);
} catch (Exception e) {
logger.error(">>>>>>>>>> xxl-job trigger error, please check if the executor[{}] is running.", address, e);
runResult = new ReturnT<String>(ReturnT.FAIL_CODE, ThrowableUtil.toString(e));
}
StringBuffer runResultSB = newStringBuffer(I18nUtil.getString("jobconf_trigger_run") + ":");
runResultSB.append("<br>address:" + address);
runResultSB.append("<br>code:" + runResult.getCode());
runResultSB.append("<br>msg:" + runResult.getMsg());
runResult.setMsg(runResultSB.toString());
return runResult;
}Dynamic proxy implementation
For synchronous calls, the proxy creates an XxlRpcFutureResponse, sends the request asynchronously, then blocks on futureResponse.get(timeout, TimeUnit.MILLISECONDS). If an error message is present, an XxlRpcException is thrown; otherwise the result is returned. The future is removed in a finally block.
if (CallType.SYNC == callType) {
XxlRpcFutureResponse futureResponse = new XxlRpcFutureResponse(invokerFactory, xxlRpcRequest, null);
try {
client.asyncSend(finalAddress, xxlRpcRequest);
XxlRpcResponse xxlRpcResponse = futureResponse.get(timeout, TimeUnit.MILLISECONDS);
if (xxlRpcResponse.getErrorMsg() != null) {
throw new XxlRpcException(xxlRpcResponse.getErrorMsg());
}
return xxlRpcResponse.getResult();
} catch (Exception e) {
logger.info(">>>>>>>>>> xxl-rpc, invoke error, address:{}, XxlRpcRequest{}", finalAddress, xxlRpcRequest);
throw (e instanceof XxlRpcException) ? e : new XxlRpcException(e);
} finally {
futureResponse.removeInvokerFuture();
}
}Future response handling
XxlRpcFutureResponsestores the response and notifies waiting threads.
public void setResponse(XxlRpcResponse response) {
this.response = response;
synchronized (lock) {
done = true;
lock.notifyAll();
}
}
public XxlRpcResponse get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
if (!done) {
synchronized (lock) {
try {
if (timeout < 0) {
lock.wait();
} else {
long timeoutMillis = (TimeUnit.MILLISECONDS == unit) ? timeout : TimeUnit.MILLISECONDS.convert(timeout, unit);
lock.wait(timeoutMillis);
}
} catch (InterruptedException e) {
throw e;
}
}
}
if (!done) {
throw new XxlRpcException("xxl-rpc, request timeout at:" + System.currentTimeMillis() + ", request:" + request.toString());
}
return response;
}Each remote call generates a UUID request ID that travels with the call. The scheduler uses this ID to locate the corresponding XxlRpcFutureResponse and invoke setResponse, waking the blocked thread.
public void notifyInvokerFuture(String requestId, final XxlRpcResponse xxlRpcResponse) {
XxlRpcFutureResponse futureResponse = futureResponsePool.get(requestId);
if (futureResponse == null) return;
if (futureResponse.getInvokeCallback() != null) {
try {
executeResponseCallback(new Runnable() {
@Override
public void run() {
if (xxlRpcResponse.getErrorMsg() != null) {
futureResponse.getInvokeCallback().onFailure(new XxlRpcException(xxlRpcResponse.getErrorMsg()));
} else {
futureResponse.getInvokeCallback().onSuccess(xxlRpcResponse.getResult());
}
}
});
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
} else {
futureResponse.setResponse(xxlRpcResponse);
}
futureResponsePool.remove(requestId);
}How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
