How to Build a High‑Performance Java RPC Framework with Netty, Zookeeper, and Custom Load Balancing
This article walks through the design and implementation of a Java RPC framework, detailing added serialization options, multiple load‑balancing strategies, client‑side service caching, TCP long‑connection handling with Netty, performance testing, and provides the full source code for replication.
Problem and Motivation
The original hand‑written RPC example used only Java serialization, created a new TCP connection for every request, and lacked load‑balancing, caching, and a server‑side thread pool. Under moderate concurrency the Netty client leaked memory and the overall latency exceeded 130 seconds for 10 000 requests.
RPC Basics
Remote Procedure Call (RPC) enables a client to invoke a method on a remote server as if it were a local call. The client stub serialises the target class name, method name and arguments into a binary payload, sends it over a socket, and the server stub deserialises the payload, invokes the local method and returns the result.
Technical Enhancements
Serialization Options
Three message codecs are now supported:
Java native serialization (default)
Protobuf (high‑performance binary format)
Kryo (fast, compact serialization)
The protocol is selected via the protocol property in RpcConfig.
Dynamic Load‑Balancing
A LoadBalance interface is implemented for each algorithm. An annotation marks the implementation, and SPI registers the classes.
/**
* Load‑balancing annotation
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface LoadBalanceAno {
String value() default "";
}
/**
* Round‑robin algorithm
*/
@LoadBalanceAno(RpcConstant.BALANCE_ROUND)
public class FullRoundBalance implements LoadBalance {
private static Logger logger = LoggerFactory.getLogger(FullRoundBalance.class);
private volatile int index;
@Override
public synchronized Service chooseOne(List<Service> services) {
if (index == services.size()) {
index = 0;
}
return services.get(index++);
}
}Implementations are listed in META-INF/services/cn.sp.rpc.loadbalance.LoadBalance. The configuration class exposes a loadBalance property (default random).
@ConfigurationProperties(prefix = "sp.rpc")
public class RpcConfig {
private String registerAddress = "127.0.0.1:2181";
private Integer serverPort = 9999;
private String protocol = "java";
private String loadBalance = "random"; // default algorithm
private Integer weight = 1;
// getters & setters omitted
}During auto‑configuration the appropriate implementation is loaded via ServiceLoader:
private LoadBalance getLoadBalance(String name) {
ServiceLoader<LoadBalance> loader = ServiceLoader.load(LoadBalance.class);
for (LoadBalance lb : loader) {
LoadBalanceAno ano = lb.getClass().getAnnotation(LoadBalanceAno.class);
Assert.notNull(ano, "load balance name can not be empty!");
if (name.equals(ano.value())) {
return lb;
}
}
throw new RpcException("invalid load balance config");
}Local Service‑List Cache
The client caches the list of provider addresses per service name in a static ConcurrentHashMap<String, List<Service>>. Cache look‑up is performed before querying Zookeeper.
public class ServerDiscoveryCache {
private static final Map<String, List<Service>> SERVER_MAP = new ConcurrentHashMap<>();
public static final List<String> SERVICE_CLASS_NAMES = new ArrayList<>();
public static void put(String serviceName, List<Service> serviceList) {
SERVER_MAP.put(serviceName, serviceList);
}
public static List<Service> get(String serviceName) {
return SERVER_MAP.get(serviceName);
}
public static boolean isEmpty(String serviceName) {
return SERVER_MAP.get(serviceName) == null || SERVER_MAP.get(serviceName).size() == 0;
}
public static void removeAll(String serviceName) {
SERVER_MAP.remove(serviceName);
}
}If a provider node disappears (Zookeeper removes its temporary node), a child‑change listener clears the corresponding cache entry to avoid stale references.
public class ZkChildListenerImpl implements IZkChildListener {
private static Logger logger = LoggerFactory.getLogger(ZkChildListenerImpl.class);
@Override
public void handleChildChange(String parentPath, List<String> childList) throws Exception {
logger.debug("Child change parentPath:[{}] -- childList:[{}]", parentPath, childList);
String[] arr = parentPath.split("/");
ServerDiscoveryCache.removeAll(arr[2]); // service name is the third segment
}
}Netty Client with Persistent TCP Connections
The client now maintains a pool of long‑lived connections. Each address maps to a SendHandlerV2 instance stored in connectedServerNodes. Connection creation is performed asynchronously in a fixed‑size thread pool.
public class NettyNetClient implements NetClient {
private static Logger logger = LoggerFactory.getLogger(NettyNetClient.class);
private static ExecutorService threadPool = new ThreadPoolExecutor(
4, 10, 200, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1000),
new ThreadFactoryBuilder().setNameFormat("rpcClient-%d").build()
);
private EventLoopGroup loopGroup = new NioEventLoopGroup(4);
public static Map<String, SendHandlerV2> connectedServerNodes = new ConcurrentHashMap<>();
@Override
public RpcResponse sendRequest(RpcRequest rpcRequest, Service service, MessageProtocol messageProtocol) {
String address = service.getAddress();
synchronized (address) {
if (connectedServerNodes.containsKey(address)) {
logger.info("Using existing connection");
return connectedServerNodes.get(address).sendRequest(rpcRequest);
}
String[] parts = address.split(":");
String host = parts[0];
int port = Integer.parseInt(parts[1]);
SendHandlerV2 handler = new SendHandlerV2(messageProtocol, address);
threadPool.submit(() -> {
Bootstrap b = new Bootstrap();
b.group(loopGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(handler);
}
});
ChannelFuture cf = b.connect(host, port);
cf.addListener((ChannelFutureListener) future -> {
connectedServerNodes.put(address, handler);
});
});
logger.info("Using new connection…");
return handler.sendRequest(rpcRequest);
}
}
}SendHandlerV2 – Netty Channel Logic
The handler creates the Netty channel, waits for it to become active with a CountDownLatch, writes the marshalled request and blocks until the response arrives (or times out).
public class SendHandlerV2 extends ChannelInboundHandlerAdapter {
private static final int CHANNEL_WAIT_TIME = 4; // seconds
private static final int RESPONSE_WAIT_TIME = 8; // seconds
private volatile Channel channel;
private final MessageProtocol messageProtocol;
private final String remoteAddress;
private static final Map<String, RpcFuture<RpcResponse>> requestMap = new ConcurrentHashMap<>();
private CountDownLatch latch = new CountDownLatch(1);
public SendHandlerV2(MessageProtocol mp, String addr) {
this.messageProtocol = mp;
this.remoteAddress = addr;
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) {
this.channel = ctx.channel();
latch.countDown();
}
public RpcResponse sendRequest(RpcRequest request) {
RpcFuture<RpcResponse> future = new RpcFuture<>();
requestMap.put(request.getRequestId(), future);
try {
byte[] data = messageProtocol.marshallingRequest(request);
ByteBuf buf = Unpooled.buffer(data.length);
buf.writeBytes(data);
if (latch.await(CHANNEL_WAIT_TIME, TimeUnit.SECONDS)) {
channel.writeAndFlush(buf);
return future.get(RESPONSE_WAIT_TIME, TimeUnit.SECONDS);
} else {
throw new RpcException("establish channel time out");
}
} catch (Exception e) {
throw new RpcException(e.getMessage());
} finally {
requestMap.remove(request.getRequestId());
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf byteBuf = (ByteBuf) msg;
byte[] resp = new byte[byteBuf.readableBytes()];
byteBuf.readBytes(resp);
ReferenceCountUtil.release(byteBuf);
RpcResponse response = messageProtocol.unmarshallingResponse(resp);
RpcFuture<RpcResponse> future = requestMap.get(response.getRequestId());
if (future != null) {
future.setResponse(response);
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
logger.error("channel inactive with remoteAddress:[{}]", remoteAddress);
NettyNetClient.connectedServerNodes.remove(remoteAddress);
}
}RpcFuture – Blocking Future Implementation
public class RpcFuture<T> implements Future<T> {
private T response;
private CountDownLatch latch = new CountDownLatch(1);
private long beginTime = System.currentTimeMillis();
public void setResponse(T resp) {
this.response = resp;
latch.countDown();
}
@Override
public T get() throws InterruptedException, ExecutionException {
latch.await();
return response;
}
@Override
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
if (latch.await(timeout, unit)) {
return response;
}
return null;
}
// cancel, isCancelled, isDone omitted for brevity
}Performance Evaluation
Test environment:
Intel(R) Core(TM) i5‑6300HQ, 4 cores
Windows 10 64‑bit
16 GB RAM
Steps:
Start a local Zookeeper instance.
Launch one consumer and two provider services using the round‑robin algorithm.
Run ab -c 4 -n 10000 http://localhost:8080/test/user?id=1 to issue 10 000 HTTP requests with 4 concurrent threads.
Result: the 10 000 requests completed in 11 seconds , a >10× speed‑up compared with the original implementation that required over 130 seconds.
Source Code
Repository URLs:
https://github.com/2YSP/rpc-spring-boot-starter
https://github.com/2YSP/rpc-example
Reference: https://www.cnblogs.com/itoak/p/13370031.html
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Code Ape Tech Column
Former Ant Group P8 engineer, pure technologist, sharing full‑stack Java, job interview and career advice through a column. Site: java-family.cn
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
