Deep Dive into XXL‑Job Communication Architecture and Asynchronous Design
This article explains the communication architecture of the XXL‑Job scheduler, detailing its Netty‑based transport layer, the overall task dispatch flow, and the clever design choices such as dynamic proxies, full asynchronous processing, and response wrapping that improve throughput and simplify remote calls.
1. Communication Layer Overview
The xxl-job framework uses netty http for communication, although it also supports Mina, Jetty, and netty tcp . In the source code the transport method is hard‑coded to netty http .
2. Overall Communication Process
The article illustrates the activity diagram of a scheduler notifying an executor to run a task.
3. Impressive Design
After reviewing the processing flow, several design highlights stand out, especially the use of Netty and multithreading.
1. Using Dynamic Proxy to Hide Communication Details
XXL‑Job defines two interfaces, ExecutorBiz and AdminBiz , which encapsulate heartbeat, pause, trigger, registration, and cancellation operations. Their implementations contain no communication logic. The XxlRpcReferenceBean creates a proxy object whose getObject() method performs remote calls.
2. Full Asynchronous Processing
When the executor receives a message, it deserializes it and stores the task information in a LinkedBlockingQueue . A separate thread fetches tasks from this queue for execution. The result is placed into a callback thread’s blocking queue, allowing the Netty worker thread to finish quickly and increasing throughput.
3. Wrapping Asynchronous Processing
The asynchronous handling is wrapped so that the code appears to be a synchronous call.
Scheduler side code (class XxlJobTrigger ):
public static ReturnT
runExecutor(TriggerParam triggerParam, String address) {
ReturnT
runResult = null;
try {
ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);
// many async steps, finally get result synchronously
runResult = executorBiz.run(triggerParam);
} catch (Exception e) {
logger.error(">>>>>>>>>>>> xxl-job trigger error, please check if the executor[{}] is running.", address, e);
runResult = new ReturnT
(ReturnT.FAIL_CODE, ThrowableUtil.toString(e));
}
StringBuffer runResultSB = new StringBuffer(I18nUtil.getString("jobconf_trigger_run") + ":");
runResultSB.append("
address:" + address);
runResultSB.append("
code:" + runResult.getCode());
runResultSB.append("
msg:" + runResult.getMsg());
runResult.setMsg(runResultSB.toString());
return runResult;
}The ExecutorBiz.run method is invoked via the dynamic proxy, and the executor returns the result after asynchronous processing, while the caller waits synchronously.
Dynamic proxy implementation:
// Proxy invocation
if (CallType.SYNC == callType) {
XxlRpcFutureResponse futureResponse = new XxlRpcFutureResponse(invokerFactory, xxlRpcRequest, null);
try {
client.asyncSend(finalAddress, xxlRpcRequest);
XxlRpcResponse xxlRpcResponse = futureResponse.get(timeout, TimeUnit.MILLISECONDS);
if (xxlRpcResponse.getErrorMsg() != null) {
throw new XxlRpcException(xxlRpcResponse.getErrorMsg());
}
return xxlRpcResponse.getResult();
} catch (Exception e) {
logger.info(">>>>>>>>>>>> xxl-rpc, invoke error, address:{}, XxlRpcRequest{}", finalAddress, xxlRpcRequest);
throw (e instanceof XxlRpcException) ? e : new XxlRpcException(e);
} finally {
futureResponse.removeInvokerFuture();
}
}The class XxlRpcFutureResponse implements thread waiting and notification:
public void setResponse(XxlRpcResponse response) {
this.response = response;
synchronized (lock) {
done = true;
lock.notifyAll();
}
}
@Override
public XxlRpcResponse get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
if (!done) {
synchronized (lock) {
try {
if (timeout < 0) {
lock.wait();
} else {
long timeoutMillis = (TimeUnit.MILLISECONDS == unit) ? timeout : TimeUnit.MILLISECONDS.convert(timeout, unit);
lock.wait(timeoutMillis);
}
} catch (InterruptedException e) {
throw e;
}
}
}
if (!done) {
throw new XxlRpcException("xxl-rpc, request timeout at:" + System.currentTimeMillis() + ", request:" + request.toString());
}
return response;
}Each remote call generates a UUID request ID that travels with the call, acting as a key to locate the corresponding XxlRpcFutureResponse and wake the waiting thread.
Notifier implementation:
public void notifyInvokerFuture(String requestId, final XxlRpcResponse xxlRpcResponse) {
final XxlRpcFutureResponse futureResponse = futureResponsePool.get(requestId);
if (futureResponse == null) {
return;
}
if (futureResponse.getInvokeCallback() != null) {
try {
executeResponseCallback(new Runnable() {
@Override
public void run() {
if (xxlRpcResponse.getErrorMsg() != null) {
futureResponse.getInvokeCallback().onFailure(new XxlRpcException(xxlRpcResponse.getErrorMsg()));
} else {
futureResponse.getInvokeCallback().onSuccess(xxlRpcResponse.getResult());
}
}
});
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
} else {
futureResponse.setResponse(xxlRpcResponse);
}
futureResponsePool.remove(requestId);
}Selected Java Interview Questions
A professional Java tech channel sharing common knowledge to help developers fill gaps. Follow us!
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.