How xxl-job Leverages Netty and Dynamic Proxies for High‑Performance RPC
xxl-job uses Netty HTTP for communication, employing dynamic proxy patterns, full asynchronous processing, and thread‑blocking mechanisms to efficiently dispatch tasks from the scheduler to executors, with detailed code examples illustrating request IDs, future responses, and synchronized result retrieval.
Communication Layer Overview
xxl-job uses Netty HTTP for communication, although it also supports Mina, Jetty, and Netty TCP; the implementation hard‑codes Netty HTTP.
Overall Communication Process
An activity diagram (shown below) illustrates how the scheduler notifies an executor to run a task.
Key Design Highlights
Dynamic Proxy to Hide Communication Details
xxl-job defines two interfaces, ExecutorBiz and AdminBiz , which encapsulate heartbeat, pause, trigger, registration, and callback operations. Their implementations contain no networking code. XxlRpcReferenceBean.getObject() generates a proxy class that performs the remote RPC.
Full Asynchronous Processing
The executor receives a message, deserializes it, and stores the task information in a LinkedBlockingQueue. Worker threads pull tasks from this queue and execute them. The processing result is placed in a callback queue, reducing Netty worker‑thread time and increasing overall throughput.
Asynchronous Wrapper for Synchronous‑Style Calls
Although the code appears synchronous, the underlying calls are asynchronous. The XxlRpcFutureResponse object blocks the calling thread until the remote response arrives, then returns the result.
public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address){
ReturnT<String> runResult = null;
try {
ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);
// many async steps, finally get sync result
runResult = executorBiz.run(triggerParam);
} catch (Exception e) {
logger.error(">>>>>>>>>>> xxl-job trigger error, please check if the executor[{}] is running.", address, e);
runResult = new ReturnT<>(ReturnT.FAIL_CODE, ThrowableUtil.toString(e));
}
StringBuffer runResultSB = new StringBuffer(I18nUtil.getString("jobconf_trigger_run") + ":");
runResultSB.append("<br>address:").append(address);
runResultSB.append("<br>code:").append(runResult.getCode());
runResultSB.append("<br>msg:").append(runResult.getMsg());
runResult.setMsg(runResultSB.toString());
return runResult;
}The dynamic proxy code (simplified) decides whether to perform a synchronous call:
if (CallType.SYNC == callType) {
XxlRpcFutureResponse futureResponse = new XxlRpcFutureResponse(invokerFactory, xxlRpcRequest, null);
try {
client.asyncSend(finalAddress, xxlRpcRequest);
XxlRpcResponse xxlRpcResponse = futureResponse.get(timeout, TimeUnit.MILLISECONDS);
if (xxlRpcResponse.getErrorMsg() != null) {
throw new XxlRpcException(xxlRpcResponse.getErrorMsg());
}
return xxlRpcResponse.getResult();
} catch (Exception e) {
logger.info(">>>>>>>>>>> xxl-rpc, invoke error, address:{}, XxlRpcRequest{}", finalAddress, xxlRpcRequest);
throw (e instanceof XxlRpcException) ? e : new XxlRpcException(e);
} finally {
futureResponse.removeInvokerFuture();
}
} XxlRpcFutureResponseimplements the thread‑waiting and notification logic:
public void setResponse(XxlRpcResponse response) {
this.response = response;
synchronized (lock) {
done = true;
lock.notifyAll();
}
}
@Override
public XxlRpcResponse get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
if (!done) {
synchronized (lock) {
try {
if (timeout < 0) {
lock.wait();
} else {
long timeoutMillis = (TimeUnit.MILLISECONDS == unit) ? timeout : TimeUnit.MILLISECONDS.convert(timeout, unit);
lock.wait(timeoutMillis);
}
} catch (InterruptedException e) {
throw e;
}
}
}
if (!done) {
throw new XxlRpcException("xxl-rpc, request timeout at:" + System.currentTimeMillis() + ", request:" + request.toString());
}
return response;
}Each remote call carries a UUID request ID, which acts as a key to locate the corresponding XxlRpcFutureResponse and wake the blocked thread:
public void notifyInvokerFuture(String requestId, final XxlRpcResponse xxlRpcResponse) {
final XxlRpcFutureResponse futureResponse = futureResponsePool.get(requestId);
if (futureResponse == null) return;
if (futureResponse.getInvokeCallback() != null) {
try {
executeResponseCallback(() -> {
if (xxlRpcResponse.getErrorMsg() != null) {
futureResponse.getInvokeCallback().onFailure(new XxlRpcException(xxlRpcResponse.getErrorMsg()));
} else {
futureResponse.getInvokeCallback().onSuccess(xxlRpcResponse.getResult());
}
});
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
} else {
futureResponse.setResponse(xxlRpcResponse);
}
futureResponsePool.remove(requestId);
}These mechanisms together give xxl‑job a clean, high‑throughput RPC layer while allowing developers to write code that looks synchronous.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Java Backend Technology
Focus on Java-related technologies: SSM, Spring ecosystem, microservices, MySQL, MyCat, clustering, distributed systems, middleware, Linux, networking, multithreading. Occasionally cover DevOps tools like Jenkins, Nexus, Docker, and ELK. Also share technical insights from time to time, committed to Java full-stack development!
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
