Building a Custom RPC Framework with Netty: A Step‑by‑Step Guide
This article introduces the fundamentals and key features of Remote Procedure Call (RPC), explores common use cases and popular frameworks, then walks through the design and implementation of a custom RPC system using Netty, covering protocol design, client‑server communication, serialization, encoding, decoding, and result handling.
1. RPC Overview
Remote Procedure Call (RPC) is a communication technique that allows a program to request a service from a remote computer without dealing with network details. It is widely used in distributed systems to simplify client‑server interaction. This article shows how to build a simple custom RPC framework based on Netty.
1.1 Main Features of RPC
Transparency: the caller invokes remote services as if they were local APIs.
Client‑Server model: the client sends a request to a server, which processes it and returns a result.
Serialization/Deserialization: request parameters are serialized into a byte stream for network transmission and deserialized on the server side.
Synchronous and asynchronous calls: synchronous calls block until a result is returned; asynchronous calls return immediately and use callbacks.
Error handling: RPC must handle network failures, timeouts, service crashes, scaling events, etc., while keeping these details transparent to the caller.
Protocol and transport: RPC can be built on various protocols (HTTP, TCP, etc.). This article uses a custom TCP‑based protocol.
1.2 Application Scenarios
Distributed systems (e.g., micro‑service architectures).
Client‑server applications such as mobile apps communicating with back‑end services.
Cross‑platform calls between different technology stacks.
Public APIs (payment gateways, authentication services, etc.).
Big‑data processing frameworks where nodes need frequent coordination (e.g., Hadoop, Spark).
Cloud computing environments with services spread across VMs or containers.
Cross‑network service calls across data centers or regions.
1.3 Common RPC Frameworks
JSF – JD.com’s open‑source distributed service framework.
gRPC – High‑performance RPC built on HTTP/2 and Protocol Buffers (Google).
Dubbo – Lightweight Java RPC framework with load balancing, service registration, and fault tolerance.
JSON‑RPC – RPC protocol that encodes calls and results in JSON.
Apache Thrift – Multi‑language RPC framework originally developed by Facebook.
2. Implementing a Custom RPC
To implement a custom RPC framework, the following core problems must be solved:
Client invocation: generate a local proxy (stub) that converts method calls into RPC requests.
Parameter serialization: serialize the method arguments into bytes.
Server data reception: deserialize the received bytes back into original parameters.
Remote execution: invoke the actual service method on the server.
Result return: serialize the result and send it back to the client.
Client result handling: deserialize the response and return it to the original caller.
2.1 Custom Communication Protocol
The protocol consists of a 16‑byte header and a body. The header includes a magic number, message type (request/response/heartbeat), status code, request ID, and body length.
2.2 Client Invocation
2.2.1 Client Usage
// Interface method
ShoppingCart shopping(String pin);
// Create a proxy instance
IShoppingCartService serviceProxy = ProxyFactory.factory(IShoppingCartService.class)
.setSerializerType(SerializerType.JDK)
.newProxyInstance();
// Invoke as a local method
ShoppingCart result = serviceProxy.shopping("userPin");
log.info("result={}", JSONObject.toJSONString(result));2.2.2 Core Functions of Proxy Factory
public class ProxyFactory<I> {
// ... omitted ...
/**
* Create proxy instance
*/
public I newProxyInstance() {
ServiceData serviceData = new ServiceData(group, providerName,
StringUtils.isNotBlank(version) ? version : "1.0.0");
Caller caller = newCaller().timeoutMillis(timeoutMillis);
Strategy strategy = StrategyConfigContext.of(strategy, retries);
Object handler;
switch (invokeType) {
case "syncCall":
handler = new SyncCaller(serviceData, caller);
break;
case "asyncCall":
handler = new AsyncCaller(client.appName(), serviceData, caller, strategy);
break;
default:
throw new RuntimeException("未知类型: " + invokeType);
}
return ProxyEnum.getDefault().newProxy(interfaceClass, handler);
}
// ... omitted ...
}2.2.3 SyncCaller Implementation
public class SyncCaller extends AbstractCaller {
@RuntimeType
public Object syncCall(@Origin Method method, @AllArguments @RuntimeType Object[] args) throws Throwable {
StarGateRequest request = createRequest(methodName, args);
Invoker invoker = new FastFailInvoker();
Future<?> future = invoker.invoke(request, method.getReturnType());
if (sync) {
return future.getResult();
} else {
return future;
}
}
}2.2.4 Parameter Serialization
public <T> Future<T> invoke(StarGateRequest request, Class<T> returnType) throws Exception {
Serializer serializer = serializer();
Message message = request.message();
Channel channel = selectChannel(message.getMetadata());
byte code = serializer.code();
byte[] bytes = serializer.writeObject(message);
request.bytes(code, bytes);
return write(channel, request, returnType);
}
public <T> byte[] writeObject(T obj) {
ByteArrayOutputStream buf = OutputStreams.getByteArrayOutputStream();
try (ObjectOutputStream output = new ObjectOutputStream(buf)) {
output.writeObject(obj);
output.flush();
return buf.toByteArray();
} catch (IOException e) {
ThrowUtil.throwException(e);
} finally {
OutputStreams.resetBuf(buf);
}
return null;
}2.2.4.1 Netty Message Encoder
public class StarGateEncoder extends MessageToByteEncoder<Payload> {
private void doEncodeRequest(RequestPayload request, ByteBuf out) {
byte sign = StarGateProtocolHeader.toSign(request.serializerCode(), StarGateProtocolHeader.REQUEST);
long invokeId = request.invokeId();
byte[] bytes = request.bytes();
int length = bytes.length;
out.writeShort(StarGateProtocolHeader.Head)
.writeByte(sign)
.writeByte(0x00)
.writeLong(invokeId)
.writeInt(length)
.writeBytes(bytes);
}
}2.3 Server Receiving Data
2.3.1 Message Decoder
public class StarGateDecoder extends ReplayingDecoder<StarGateDecoder.State> {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
switch (state()) {
case HEAD:
checkMagic(in.readShort());
checkpoint(State.HEAD);
case SIGN:
header.sign(in.readByte());
checkpoint(State.STATUS);
case STATUS:
header.status(in.readByte());
checkpoint(State.ID);
case ID:
header.id(in.readLong());
checkpoint(State.BODY_SIZE);
case BODY_SIZE:
header.bodySize(in.readInt());
checkpoint(State.BODY);
case BODY:
if (header.messageCode() == StarGateProtocolHeader.REQUEST) {
int length = checkBodySize(header.bodySize());
byte[] bytes = new byte[length];
in.readBytes(bytes);
RequestPayload request = new RequestPayload(header.id());
request.bytes(header.serializerCode(), bytes);
out.add(request);
} else {
throw new Exception("错误标志位");
}
checkpoint(State.HEAD);
}
}
}2.3.2 Parameter Deserialization
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof RequestPayload) {
StarGateRequest request = new StarGateRequest((RequestPayload) msg);
byte code = request.serializerCode();
Serializer serializer = SerializerFactory.getSerializer(code);
byte[] bytes = request.bytes();
Message message = serializer.readObject(bytes, Message.class);
request.message(message);
process(message);
} else {
ReferenceCountUtil.release(msg);
}
}2.3.3 Process Client Request
public void process(Message message) {
try {
ServiceMetadata metadata = message.getMetadata();
String providerName = metadata.getProviderName();
providerName = findServiceImpl(providerName);
String methodName = message.getMethodName();
Object[] args = message.getArgs();
ClassLoader cl = Thread.currentThread().getContextClassLoader();
Class<?> clazz = cl.loadClass(providerName);
Object instance = clazz.getDeclaredConstructor().newInstance();
Class<?>[] paramTypes = new Class[args.length];
for (int i = 0; i < args.length; i++) {
paramTypes[i] = args[i].getClass();
}
Method method = clazz.getMethod(methodName, paramTypes);
Object invokeResult = method.invoke(instance, args);
doProcess(invokeResult);
} catch (Exception e) {
log.error("调用异常:", e);
throw new RuntimeException(e);
}
}2.3.4 Return Call Result
private void doProcess(Object realResult) {
ResultWrapper result = new ResultWrapper();
result.setResult(realResult);
byte code = request.serializerCode();
Serializer serializer = SerializerFactory.getSerializer(code);
Response response = new Response(request.invokeId());
byte[] bytes = serializer.writeObject(result);
response.bytes(code, bytes);
response.status(Status.OK.value());
channel.writeAndFlush(response).addListener((ChannelFutureListener) listener -> {
if (listener.isSuccess()) {
log.info("响应成功");
} else {
log.error("响应失败, channel: {}, cause: {}.", channel, listener.cause());
}
});
}2.4 Client Receiving Call Result
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof ResponseMessage) {
ResponseMessage responseMessage = (ResponseMessage) msg;
StarGateResponse response = new StarGateResponse(ResponseMessage.getMsg());
byte code = response.serializerCode();
Serializer serializer = SerializerFactory.getSerializer(code);
byte[] bytes = responseMessage.bytes();
Result result = serializer.readObject(bytes, Result.class);
response.result(result);
long invokeId = response.id();
DefaultFuture<?> future = FUTURES_MAP.remove(invokeId);
if (response.status() == Status.OK.value()) {
future.complete((V) response.getResult());
} else {
// handle error
}
} else {
ReferenceCountUtil.release(msg);
}
}3. Conclusion
The article first introduced RPC concepts, application scenarios, and common frameworks, then demonstrated how to implement a basic custom RPC framework to give readers a clear idea of the underlying mechanics and how Netty can be used for efficient network programming. Production‑grade RPC systems require additional features such as rate limiting, load balancing, circuit breaking, generic calls, auto‑reconnect, and extensible interceptors.
Typical RPC architectures involve three roles: service provider, service consumer, and registry. This tutorial omitted the registry implementation and assumed that providers have already registered their services, with consumers establishing a TCP long‑connection to the providers.
Future articles will cover registry design, service discovery, and automatic registration.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
JD Cloud Developers
JD Cloud Developers (Developer of JD Technology) is a JD Technology Group platform offering technical sharing and communication for AI, cloud computing, IoT and related developers. It publishes JD product technical information, industry content, and tech event news. Embrace technology and partner with developers to envision the future.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
