How xxl-job Leverages Netty and Dynamic Proxies for High‑Performance RPC
This article explores xxl-job’s underlying communication architecture, detailing how it employs Netty HTTP, dynamic proxy patterns, full asynchronous processing, and thread‑blocking mechanisms to achieve efficient remote procedure calls, accompanied by code examples and a visual activity diagram.
Communication Layer Introduction
xxl-job uses Netty HTTP as the default communication method, although it also supports Mina, Jetty, and Netty TCP. The implementation hard‑codes Netty HTTP in the source code.
Overall Communication Process
Using the scheduler notifying an executor to run a task as an example, the activity diagram below illustrates the flow.
Stunning Design Highlights
Dynamic Proxy Pattern – Hiding Communication Details
xxl-job defines two interfaces, ExecutorBiz and AdminBiz, which encapsulate heartbeat, pause, trigger, registration, and cancellation operations without any communication logic. The XxlRpcReferenceBean creates a proxy in its getObject() method; the proxy performs the remote calls.
Full Asynchronous Processing
When the executor receives a message, it deserializes it and stores the task information in a LinkedBlockingQueue. Separate worker threads pull tasks from the queue and execute them. The results are placed into another blocking queue for callbacks, reducing Netty worker thread time and increasing throughput.
Asynchronous Wrapper Making Calls Look Synchronous
The scheduler’s XxlJobTrigger code synchronously waits for the result while the underlying processing is asynchronous. The relevant method is shown below:
public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address){
ReturnT<String> runResult = null;
try {
ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);
// many async steps, finally get synchronous result
runResult = executorBiz.run(triggerParam);
} catch (Exception e) {
logger.error(">>>>>>>>>>> xxl-job trigger error, please check if the executor[{}] is running.", address, e);
runResult = new ReturnT<String>(ReturnT.FAIL_CODE, ThrowableUtil.toString(e));
}
StringBuffer runResultSB = new StringBuffer(I18nUtil.getString("jobconf_trigger_run") + ":");
runResultSB.append("<br>address:").append(address);
runResultSB.append("<br>code:").append(runResult.getCode());
runResultSB.append("<br>msg:").append(runResult.getMsg());
runResult.setMsg(runResultSB.toString());
return runResult;
}Dynamic Proxy Invocation Logic
The proxy’s synchronous call path creates a XxlRpcFutureResponse, sends the request asynchronously, then blocks on futureResponse.get() until the response arrives.
if (CallType.SYNC == callType) {
// future‑response set
XxlRpcFutureResponse futureResponse = new XxlRpcFutureResponse(invokerFactory, xxlRpcRequest, null);
try {
// do invoke
client.asyncSend(finalAddress, xxlRpcRequest);
// future get
XxlRpcResponse xxlRpcResponse = futureResponse.get(timeout, TimeUnit.MILLISECONDS);
if (xxlRpcResponse.getErrorMsg() != null) {
throw new XxlRpcException(xxlRpcResponse.getErrorMsg());
}
return xxlRpcResponse.getResult();
} catch (Exception e) {
logger.info(">>>>>>>>>>> xxl-rpc, invoke error, address:{}, XxlRpcRequest{}", finalAddress, xxlRpcRequest);
throw (e instanceof XxlRpcException) ? e : new XxlRpcException(e);
} finally {
// future‑response remove
futureResponse.removeInvokerFuture();
}
}Future Response Implementation
XxlRpcFutureResponsestores the response, notifies waiting threads, and provides a timed get method.
public void setResponse(XxlRpcResponse response) {
this.response = response;
synchronized (lock) {
done = true;
lock.notifyAll();
}
}
@Override
public XxlRpcResponse get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
if (!done) {
synchronized (lock) {
try {
if (timeout < 0) {
lock.wait();
} else {
long timeoutMillis = (TimeUnit.MILLISECONDS == unit) ? timeout : TimeUnit.MILLISECONDS.convert(timeout, unit);
lock.wait(timeoutMillis);
}
} catch (InterruptedException e) {
throw e;
}
}
}
if (!done) {
throw new XxlRpcException("xxl-rpc, request timeout at:" + System.currentTimeMillis() + ", request:" + request.toString());
}
return response;
}Mapping Responses to Requests via UUID
Each remote call carries a UUID request ID that acts as a key. When the executor returns a response, the framework looks up the corresponding XxlRpcFutureResponse using this ID and invokes setResponse to wake the waiting thread.
public void notifyInvokerFuture(String requestId, final XxlRpcResponse xxlRpcResponse) {
// find future response by requestId
final XxlRpcFutureResponse futureResponse = futureResponsePool.get(requestId);
if (futureResponse == null) {
return;
}
if (futureResponse.getInvokeCallback() != null) {
try {
executeResponseCallback(new Runnable() {
@Override
public void run() {
if (xxlRpcResponse.getErrorMsg() != null) {
futureResponse.getInvokeCallback().onFailure(new XxlRpcException(xxlRpcResponse.getErrorMsg()));
} else {
futureResponse.getInvokeCallback().onSuccess(xxlRpcResponse.getResult());
}
}
});
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
} else {
// direct notify
futureResponse.setResponse(xxlRpcResponse);
}
// remove mapping
futureResponsePool.remove(requestId);
}Through these mechanisms, xxl-job achieves a seamless blend of asynchronous execution and synchronous‑style result retrieval, delivering high throughput and low latency in distributed job scheduling.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Su San Talks Tech
Su San, former staff at several leading tech companies, is a top creator on Juejin and a premium creator on CSDN, and runs the free coding practice site www.susan.net.cn.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
