How to Build a High‑Performance Java RPC Framework with Netty, Zookeeper, and Custom Load Balancing

This article walks through the design and implementation of a Java RPC framework, detailing added serialization options, multiple load‑balancing strategies, client‑side service caching, TCP long‑connection handling with Netty, performance testing, and provides the full source code for replication.

Code Ape Tech Column
Code Ape Tech Column
Code Ape Tech Column
How to Build a High‑Performance Java RPC Framework with Netty, Zookeeper, and Custom Load Balancing

Problem and Motivation

The original hand‑written RPC example used only Java serialization, created a new TCP connection for every request, and lacked load‑balancing, caching, and a server‑side thread pool. Under moderate concurrency the Netty client leaked memory and the overall latency exceeded 130 seconds for 10 000 requests.

RPC Basics

Remote Procedure Call (RPC) enables a client to invoke a method on a remote server as if it were a local call. The client stub serialises the target class name, method name and arguments into a binary payload, sends it over a socket, and the server stub deserialises the payload, invokes the local method and returns the result.

Technical Enhancements

Serialization Options

Three message codecs are now supported:

Java native serialization (default)

Protobuf (high‑performance binary format)

Kryo (fast, compact serialization)

The protocol is selected via the protocol property in RpcConfig.

Dynamic Load‑Balancing

A LoadBalance interface is implemented for each algorithm. An annotation marks the implementation, and SPI registers the classes.

/**
 * Load‑balancing annotation
 */
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface LoadBalanceAno {
    String value() default "";
}

/**
 * Round‑robin algorithm
 */
@LoadBalanceAno(RpcConstant.BALANCE_ROUND)
public class FullRoundBalance implements LoadBalance {
    private static Logger logger = LoggerFactory.getLogger(FullRoundBalance.class);
    private volatile int index;
    @Override
    public synchronized Service chooseOne(List<Service> services) {
        if (index == services.size()) {
            index = 0;
        }
        return services.get(index++);
    }
}

Implementations are listed in META-INF/services/cn.sp.rpc.loadbalance.LoadBalance. The configuration class exposes a loadBalance property (default random).

@ConfigurationProperties(prefix = "sp.rpc")
public class RpcConfig {
    private String registerAddress = "127.0.0.1:2181";
    private Integer serverPort = 9999;
    private String protocol = "java";
    private String loadBalance = "random"; // default algorithm
    private Integer weight = 1;
    // getters & setters omitted
}

During auto‑configuration the appropriate implementation is loaded via ServiceLoader:

private LoadBalance getLoadBalance(String name) {
    ServiceLoader<LoadBalance> loader = ServiceLoader.load(LoadBalance.class);
    for (LoadBalance lb : loader) {
        LoadBalanceAno ano = lb.getClass().getAnnotation(LoadBalanceAno.class);
        Assert.notNull(ano, "load balance name can not be empty!");
        if (name.equals(ano.value())) {
            return lb;
        }
    }
    throw new RpcException("invalid load balance config");
}

Local Service‑List Cache

The client caches the list of provider addresses per service name in a static ConcurrentHashMap<String, List<Service>>. Cache look‑up is performed before querying Zookeeper.

public class ServerDiscoveryCache {
    private static final Map<String, List<Service>> SERVER_MAP = new ConcurrentHashMap<>();
    public static final List<String> SERVICE_CLASS_NAMES = new ArrayList<>();
    public static void put(String serviceName, List<Service> serviceList) {
        SERVER_MAP.put(serviceName, serviceList);
    }
    public static List<Service> get(String serviceName) {
        return SERVER_MAP.get(serviceName);
    }
    public static boolean isEmpty(String serviceName) {
        return SERVER_MAP.get(serviceName) == null || SERVER_MAP.get(serviceName).size() == 0;
    }
    public static void removeAll(String serviceName) {
        SERVER_MAP.remove(serviceName);
    }
}

If a provider node disappears (Zookeeper removes its temporary node), a child‑change listener clears the corresponding cache entry to avoid stale references.

public class ZkChildListenerImpl implements IZkChildListener {
    private static Logger logger = LoggerFactory.getLogger(ZkChildListenerImpl.class);
    @Override
    public void handleChildChange(String parentPath, List<String> childList) throws Exception {
        logger.debug("Child change parentPath:[{}] -- childList:[{}]", parentPath, childList);
        String[] arr = parentPath.split("/");
        ServerDiscoveryCache.removeAll(arr[2]); // service name is the third segment
    }
}

Netty Client with Persistent TCP Connections

The client now maintains a pool of long‑lived connections. Each address maps to a SendHandlerV2 instance stored in connectedServerNodes. Connection creation is performed asynchronously in a fixed‑size thread pool.

public class NettyNetClient implements NetClient {
    private static Logger logger = LoggerFactory.getLogger(NettyNetClient.class);
    private static ExecutorService threadPool = new ThreadPoolExecutor(
        4, 10, 200, TimeUnit.SECONDS,
        new LinkedBlockingQueue<>(1000),
        new ThreadFactoryBuilder().setNameFormat("rpcClient-%d").build()
    );
    private EventLoopGroup loopGroup = new NioEventLoopGroup(4);
    public static Map<String, SendHandlerV2> connectedServerNodes = new ConcurrentHashMap<>();

    @Override
    public RpcResponse sendRequest(RpcRequest rpcRequest, Service service, MessageProtocol messageProtocol) {
        String address = service.getAddress();
        synchronized (address) {
            if (connectedServerNodes.containsKey(address)) {
                logger.info("Using existing connection");
                return connectedServerNodes.get(address).sendRequest(rpcRequest);
            }
            String[] parts = address.split(":");
            String host = parts[0];
            int port = Integer.parseInt(parts[1]);
            SendHandlerV2 handler = new SendHandlerV2(messageProtocol, address);
            threadPool.submit(() -> {
                Bootstrap b = new Bootstrap();
                b.group(loopGroup)
                 .channel(NioSocketChannel.class)
                 .option(ChannelOption.TCP_NODELAY, true)
                 .handler(new ChannelInitializer<SocketChannel>() {
                     @Override
                     protected void initChannel(SocketChannel ch) {
                         ch.pipeline().addLast(handler);
                     }
                 });
                ChannelFuture cf = b.connect(host, port);
                cf.addListener((ChannelFutureListener) future -> {
                    connectedServerNodes.put(address, handler);
                });
            });
            logger.info("Using new connection…");
            return handler.sendRequest(rpcRequest);
        }
    }
}

SendHandlerV2 – Netty Channel Logic

The handler creates the Netty channel, waits for it to become active with a CountDownLatch, writes the marshalled request and blocks until the response arrives (or times out).

public class SendHandlerV2 extends ChannelInboundHandlerAdapter {
    private static final int CHANNEL_WAIT_TIME = 4; // seconds
    private static final int RESPONSE_WAIT_TIME = 8; // seconds
    private volatile Channel channel;
    private final MessageProtocol messageProtocol;
    private final String remoteAddress;
    private static final Map<String, RpcFuture<RpcResponse>> requestMap = new ConcurrentHashMap<>();
    private CountDownLatch latch = new CountDownLatch(1);

    public SendHandlerV2(MessageProtocol mp, String addr) {
        this.messageProtocol = mp;
        this.remoteAddress = addr;
    }

    @Override
    public void channelRegistered(ChannelHandlerContext ctx) {
        this.channel = ctx.channel();
        latch.countDown();
    }

    public RpcResponse sendRequest(RpcRequest request) {
        RpcFuture<RpcResponse> future = new RpcFuture<>();
        requestMap.put(request.getRequestId(), future);
        try {
            byte[] data = messageProtocol.marshallingRequest(request);
            ByteBuf buf = Unpooled.buffer(data.length);
            buf.writeBytes(data);
            if (latch.await(CHANNEL_WAIT_TIME, TimeUnit.SECONDS)) {
                channel.writeAndFlush(buf);
                return future.get(RESPONSE_WAIT_TIME, TimeUnit.SECONDS);
            } else {
                throw new RpcException("establish channel time out");
            }
        } catch (Exception e) {
            throw new RpcException(e.getMessage());
        } finally {
            requestMap.remove(request.getRequestId());
        }
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf byteBuf = (ByteBuf) msg;
        byte[] resp = new byte[byteBuf.readableBytes()];
        byteBuf.readBytes(resp);
        ReferenceCountUtil.release(byteBuf);
        RpcResponse response = messageProtocol.unmarshallingResponse(resp);
        RpcFuture<RpcResponse> future = requestMap.get(response.getRequestId());
        if (future != null) {
            future.setResponse(response);
        }
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) {
        logger.error("channel inactive with remoteAddress:[{}]", remoteAddress);
        NettyNetClient.connectedServerNodes.remove(remoteAddress);
    }
}

RpcFuture – Blocking Future Implementation

public class RpcFuture<T> implements Future<T> {
    private T response;
    private CountDownLatch latch = new CountDownLatch(1);
    private long beginTime = System.currentTimeMillis();

    public void setResponse(T resp) {
        this.response = resp;
        latch.countDown();
    }

    @Override
    public T get() throws InterruptedException, ExecutionException {
        latch.await();
        return response;
    }

    @Override
    public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        if (latch.await(timeout, unit)) {
            return response;
        }
        return null;
    }

    // cancel, isCancelled, isDone omitted for brevity
}

Performance Evaluation

Test environment:

Intel(R) Core(TM) i5‑6300HQ, 4 cores

Windows 10 64‑bit

16 GB RAM

Steps:

Start a local Zookeeper instance.

Launch one consumer and two provider services using the round‑robin algorithm.

Run ab -c 4 -n 10000 http://localhost:8080/test/user?id=1 to issue 10 000 HTTP requests with 4 concurrent threads.

Result: the 10 000 requests completed in 11 seconds , a >10× speed‑up compared with the original implementation that required over 130 seconds.

Source Code

Repository URLs:

https://github.com/2YSP/rpc-spring-boot-starter

https://github.com/2YSP/rpc-example

Reference: https://www.cnblogs.com/itoak/p/13370031.html

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

JavaRPCload balancingPerformance TestingZooKeeperNettySpring Boot
Code Ape Tech Column
Written by

Code Ape Tech Column

Former Ant Group P8 engineer, pure technologist, sharing full‑stack Java, job interview and career advice through a column. Site: java-family.cn

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.