How xxl-job Uses Netty, Dynamic Proxies, and Async Design for High‑Performance RPC
This article explains how xxl-job leverages Netty HTTP, dynamic proxy patterns, and fully asynchronous processing—including thread‑waiting mechanisms and UUID request IDs—to achieve efficient remote procedure calls, with detailed code examples illustrating the scheduler‑executor communication flow.
Communication Layer Introduction
xxl-job uses Netty HTTP for communication; although it also supports Mina, Jetty, and Netty TCP, the code hard‑codes Netty HTTP.
Overall Communication Process
Using the scheduler to notify an executor to run a task, the activity diagram is shown below.
Stunning Design
The design cleverly combines Netty and multithreading, applying them seamlessly.
Dynamic Proxy Pattern Hides Communication Details
xxl-job defines two interfaces, ExecutorBiz and AdminBiz, which encapsulate operations such as heartbeat, pause, trigger execution, registration, and callbacks. Their implementations contain no communication logic.
The XxlRpcReferenceBean#getObject() method creates a proxy that performs remote communication.
Fully Asynchronous Processing
When the executor receives a message, it deserializes it and stores the task information in a LinkedBlockingQueue instead of executing synchronously. The result is also placed into a callback thread’s blocking queue, making the whole flow asynchronous and reducing Netty worker thread time, thus increasing throughput.
Wrapping Asynchronous Calls
The asynchronous handling is wrapped so the code appears as a synchronous call.
Scheduler code that triggers task execution:
public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address) {
ReturnT<String> runResult = null;
try {
ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);
// many async processes, finally get result synchronously
runResult = executorBiz.run(triggerParam);
} catch (Exception e) {
logger.error(">>>>>>>>>> xxl-job trigger error, please check if the executor[{}] is running.", address, e);
runResult = new ReturnT<String>(ReturnT.FAIL_CODE, ThrowableUtil.toString(e));
}
StringBuffer runResultSB = new StringBuffer(I18nUtil.getString("jobconf_trigger_run") + ":");
runResultSB.append("<br>address:").append(address);
runResultSB.append("<br>code:").append(runResult.getCode());
runResultSB.append("<br>msg:").append(runResult.getMsg());
runResult.setMsg(runResultSB.toString());
return runResult;
}ExecutorBiz.run is invoked via the dynamic proxy; the scheduler thread blocks until the executor returns the result, then wakes up.
Dynamic proxy implementation (simplified):
if (CallType.SYNC == callType) {
XxlRpcFutureResponse futureResponse = new XxlRpcFutureResponse(invokerFactory, xxlRpcRequest, null);
try {
client.asyncSend(finalAddress, xxlRpcRequest);
XxlRpcResponse xxlRpcResponse = futureResponse.get(timeout, TimeUnit.MILLISECONDS);
if (xxlRpcResponse.getErrorMsg() != null) {
throw new XxlRpcException(xxlRpcResponse.getErrorMsg());
}
return xxlRpcResponse.getResult();
} catch (Exception e) {
logger.info(">>>>>>>>>> xxl-rpc, invoke error, address:{}, XxlRpcRequest{}", finalAddress, xxlRpcRequest);
throw (e instanceof XxlRpcException) ? e : new XxlRpcException(e);
} finally {
futureResponse.removeInvokerFuture();
}
}XxlRpcFutureResponse implements thread waiting and waking:
public void setResponse(XxlRpcResponse response) {
this.response = response;
synchronized (lock) {
done = true;
lock.notifyAll();
}
}
public XxlRpcResponse get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
if (!done) {
synchronized (lock) {
try {
if (timeout < 0) {
lock.wait();
} else {
long timeoutMillis = (TimeUnit.MILLISECONDS == unit) ? timeout : TimeUnit.MILLISECONDS.convert(timeout, unit);
lock.wait(timeoutMillis);
}
} catch (InterruptedException e) {
throw e;
}
}
}
if (!done) {
throw new XxlRpcException("xxl-rpc, request timeout at:" + System.currentTimeMillis() + ", request:" + request.toString());
}
return response;
}Each remote call generates a UUID request ID that travels through the whole process, acting like a key to locate the corresponding XxlRpcFutureResponse and wake the waiting thread.
Notifier that matches request ID to future response:
public void notifyInvokerFuture(String requestId, final XxlRpcResponse xxlRpcResponse) {
XxlRpcFutureResponse futureResponse = futureResponsePool.get(requestId);
if (futureResponse == null) {
return;
}
if (futureResponse.getInvokeCallback() != null) {
try {
executeResponseCallback(new Runnable() {
@Override
public void run() {
if (xxlRpcResponse.getErrorMsg() != null) {
futureResponse.getInvokeCallback().onFailure(new XxlRpcException(xxlRpcResponse.getErrorMsg()));
} else {
futureResponse.getInvokeCallback().onSuccess(xxlRpcResponse.getResult());
}
}
});
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
} else {
futureResponse.setResponse(xxlRpcResponse);
}
futureResponsePool.remove(requestId);
}Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Su San Talks Tech
Su San, former staff at several leading tech companies, is a top creator on Juejin and a premium creator on CSDN, and runs the free coding practice site www.susan.net.cn.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
