How xxl-job Leverages Netty and Dynamic Proxies for High‑Performance RPC

This article explains how xxl‑job uses Netty HTTP, dynamic proxies, and fully asynchronous processing to implement a high‑throughput RPC mechanism, detailing the communication layer, overall workflow, and the clever design that wraps async calls into synchronous‑looking code.

Code Ape Tech Column
Code Ape Tech Column
Code Ape Tech Column
How xxl-job Leverages Netty and Dynamic Proxies for High‑Performance RPC

Introduction to Communication Layer

Overall Communication Process

Stunning Design

image
image

Communication Layer Overview

xxl-job uses Netty HTTP for communication; although it also supports Mina, Jetty, and Netty TCP, the code hard‑codes Netty HTTP.

Overall Communication Process

Using the scheduler notifying an executor as an example, the activity diagram (image) shows the flow.

image
image

Stunning Design

The implementation cleverly combines Netty and multithreading, making the processing flow smooth.

Key design highlights:

Dynamic Proxy hides communication details

xxl-job defines two interfaces, ExecutorBiz and AdminBiz, which encapsulate heartbeat, pause, trigger, registration, etc., but their implementations contain no communication logic. XxlRpcReferenceBean. getObject() creates a proxy that performs remote calls.

Fully Asynchronous Processing

When the executor receives a message, it deserializes it and stores task information in a LinkedBlockingQueue; a separate thread picks up tasks and executes them. Results are placed into a callback queue and returned asynchronously, reducing Netty worker thread time and increasing throughput.

Wrapping Asynchronous Calls as Synchronous

The code appears to be synchronous thanks to a wrapper around the async processing.

Scheduler side code ( XxlJobTrigger.runExecutor) synchronously waits for the result after invoking the remote executor:

public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address) {
    ReturnT<String> runResult = null;
    try {
        ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);
        // many async steps, finally get synchronous result
        runResult = executorBiz.run(triggerParam);
    } catch (Exception e) {
        logger.error(">>>>>>>>>> xxl-job trigger error, please check if the executor[{}] is running.", address, e);
        runResult = new ReturnT<String>(ReturnT.FAIL_CODE, ThrowableUtil.toString(e));
    }
    StringBuffer runResultSB = new StringBuffer(I18nUtil.getString("jobconf_trigger_run") + ":");
    runResultSB.append("<br>address:").append(address);
    runResultSB.append("<br>code:").append(runResult.getCode());
    runResultSB.append("<br>msg:").append(runResult.getMsg());
    runResult.setMsg(runResultSB.toString());
    return runResult;
}
ExecutorBiz.run

is a dynamic‑proxy call; the scheduler thread blocks until the executor finishes and returns the result, at which point the waiting thread is notified.

The dynamic proxy implementation decides between synchronous and asynchronous call types. For synchronous calls it creates an XxlRpcFutureResponse, sends the request asynchronously, then waits for the response:

if (CallType.SYNC == callType) {
    XxlRpcFutureResponse futureResponse = new XxlRpcFutureResponse(invokerFactory, xxlRpcRequest, null);
    try {
        client.asyncSend(finalAddress, xxlRpcRequest);
        XxlRpcResponse xxlRpcResponse = futureResponse.get(timeout, TimeUnit.MILLISECONDS);
        if (xxlRpcResponse.getErrorMsg() != null) {
            throw new XxlRpcException(xxlRpcResponse.getErrorMsg());
        }
        return xxlRpcResponse.getResult();
    } catch (Exception e) {
        logger.info(">>>>>>>>>> xxl-rpc, invoke error, address:{}, XxlRpcRequest{}", finalAddress, xxlRpcRequest);
        throw (e instanceof XxlRpcException) ? e : new XxlRpcException(e);
    } finally {
        futureResponse.removeInvokerFuture();
    }
}
XxlRpcFutureResponse

implements thread waiting and notification. The setResponse method stores the response and notifies the waiting lock:

public void setResponse(XxlRpcResponse response) {
    this.response = response;
    synchronized (lock) {
        done = true;
        lock.notifyAll();
    }
}
public XxlRpcResponse get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
    if (!done) {
        synchronized (lock) {
            try {
                if (timeout < 0) {
                    lock.wait();
                } else {
                    long timeoutMillis = (TimeUnit.MILLISECONDS == unit) ? timeout : TimeUnit.MILLISECONDS.convert(timeout, unit);
                    lock.wait(timeoutMillis);
                }
            } catch (InterruptedException e) {
                throw e;
            }
        }
    }
    if (!done) {
        throw new XxlRpcException("xxl-rpc, request timeout at:" + System.currentTimeMillis() + ", request:" + request.toString());
    }
    return response;
}

Each remote call carries a UUID request ID, which acts as a key to locate the corresponding XxlRpcFutureResponse and wake the correct thread.

public void notifyInvokerFuture(String requestId, final XxlRpcResponse xxlRpcResponse) {
    final XxlRpcFutureResponse futureResponse = futureResponsePool.get(requestId);
    if (futureResponse == null) {
        return;
    }
    if (futureResponse.getInvokeCallback() != null) {
        try {
            executeResponseCallback(new Runnable() {
                @Override
                public void run() {
                    if (xxlRpcResponse.getErrorMsg() != null) {
                        futureResponse.getInvokeCallback().onFailure(new XxlRpcException(xxlRpcResponse.getErrorMsg()));
                    } else {
                        futureResponse.getInvokeCallback().onSuccess(xxlRpcResponse.getResult());
                    }
                }
            });
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        }
    } else {
        futureResponse.setResponse(xxlRpcResponse);
    }
    futureResponsePool.remove(requestId);
}
Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

JavaRPCAsynchronousnettyXXL-JobDynamic Proxy
Code Ape Tech Column
Written by

Code Ape Tech Column

Former Ant Group P8 engineer, pure technologist, sharing full‑stack Java, job interview and career advice through a column. Site: java-family.cn

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.