How xxl-job Leverages Netty and Dynamic Proxies for High‑Performance RPC
This article explains how xxl‑job uses Netty HTTP, dynamic proxies, and fully asynchronous processing to implement a high‑throughput RPC mechanism, detailing the communication layer, overall workflow, and the clever design that wraps async calls into synchronous‑looking code.
Introduction to Communication Layer
Overall Communication Process
Stunning Design
Communication Layer Overview
xxl-job uses Netty HTTP for communication; although it also supports Mina, Jetty, and Netty TCP, the code hard‑codes Netty HTTP.
Overall Communication Process
Using the scheduler notifying an executor as an example, the activity diagram (image) shows the flow.
Stunning Design
The implementation cleverly combines Netty and multithreading, making the processing flow smooth.
Key design highlights:
Dynamic Proxy hides communication details
xxl-job defines two interfaces, ExecutorBiz and AdminBiz, which encapsulate heartbeat, pause, trigger, registration, etc., but their implementations contain no communication logic. XxlRpcReferenceBean. getObject() creates a proxy that performs remote calls.
Fully Asynchronous Processing
When the executor receives a message, it deserializes it and stores task information in a LinkedBlockingQueue; a separate thread picks up tasks and executes them. Results are placed into a callback queue and returned asynchronously, reducing Netty worker thread time and increasing throughput.
Wrapping Asynchronous Calls as Synchronous
The code appears to be synchronous thanks to a wrapper around the async processing.
Scheduler side code ( XxlJobTrigger.runExecutor) synchronously waits for the result after invoking the remote executor:
public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address) {
ReturnT<String> runResult = null;
try {
ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);
// many async steps, finally get synchronous result
runResult = executorBiz.run(triggerParam);
} catch (Exception e) {
logger.error(">>>>>>>>>> xxl-job trigger error, please check if the executor[{}] is running.", address, e);
runResult = new ReturnT<String>(ReturnT.FAIL_CODE, ThrowableUtil.toString(e));
}
StringBuffer runResultSB = new StringBuffer(I18nUtil.getString("jobconf_trigger_run") + ":");
runResultSB.append("<br>address:").append(address);
runResultSB.append("<br>code:").append(runResult.getCode());
runResultSB.append("<br>msg:").append(runResult.getMsg());
runResult.setMsg(runResultSB.toString());
return runResult;
} ExecutorBiz.runis a dynamic‑proxy call; the scheduler thread blocks until the executor finishes and returns the result, at which point the waiting thread is notified.
The dynamic proxy implementation decides between synchronous and asynchronous call types. For synchronous calls it creates an XxlRpcFutureResponse, sends the request asynchronously, then waits for the response:
if (CallType.SYNC == callType) {
XxlRpcFutureResponse futureResponse = new XxlRpcFutureResponse(invokerFactory, xxlRpcRequest, null);
try {
client.asyncSend(finalAddress, xxlRpcRequest);
XxlRpcResponse xxlRpcResponse = futureResponse.get(timeout, TimeUnit.MILLISECONDS);
if (xxlRpcResponse.getErrorMsg() != null) {
throw new XxlRpcException(xxlRpcResponse.getErrorMsg());
}
return xxlRpcResponse.getResult();
} catch (Exception e) {
logger.info(">>>>>>>>>> xxl-rpc, invoke error, address:{}, XxlRpcRequest{}", finalAddress, xxlRpcRequest);
throw (e instanceof XxlRpcException) ? e : new XxlRpcException(e);
} finally {
futureResponse.removeInvokerFuture();
}
} XxlRpcFutureResponseimplements thread waiting and notification. The setResponse method stores the response and notifies the waiting lock:
public void setResponse(XxlRpcResponse response) {
this.response = response;
synchronized (lock) {
done = true;
lock.notifyAll();
}
}
public XxlRpcResponse get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
if (!done) {
synchronized (lock) {
try {
if (timeout < 0) {
lock.wait();
} else {
long timeoutMillis = (TimeUnit.MILLISECONDS == unit) ? timeout : TimeUnit.MILLISECONDS.convert(timeout, unit);
lock.wait(timeoutMillis);
}
} catch (InterruptedException e) {
throw e;
}
}
}
if (!done) {
throw new XxlRpcException("xxl-rpc, request timeout at:" + System.currentTimeMillis() + ", request:" + request.toString());
}
return response;
}Each remote call carries a UUID request ID, which acts as a key to locate the corresponding XxlRpcFutureResponse and wake the correct thread.
public void notifyInvokerFuture(String requestId, final XxlRpcResponse xxlRpcResponse) {
final XxlRpcFutureResponse futureResponse = futureResponsePool.get(requestId);
if (futureResponse == null) {
return;
}
if (futureResponse.getInvokeCallback() != null) {
try {
executeResponseCallback(new Runnable() {
@Override
public void run() {
if (xxlRpcResponse.getErrorMsg() != null) {
futureResponse.getInvokeCallback().onFailure(new XxlRpcException(xxlRpcResponse.getErrorMsg()));
} else {
futureResponse.getInvokeCallback().onSuccess(xxlRpcResponse.getResult());
}
}
});
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
} else {
futureResponse.setResponse(xxlRpcResponse);
}
futureResponsePool.remove(requestId);
}Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Code Ape Tech Column
Former Ant Group P8 engineer, pure technologist, sharing full‑stack Java, job interview and career advice through a column. Site: java-family.cn
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
