Inside RocketMQ’s Remoting Module: Protocol, Encoding, and Async Communication Explained
This article dissects RocketMQ’s Remoting communication module, covering its architecture, class hierarchy, custom protocol design, message encoding/decoding, synchronous, asynchronous and one‑way communication modes, and the detailed client‑server interaction flow including timeout handling and callback execution.
Architecture Overview
RocketMQ’s messaging cluster relies on a dedicated Remoting module built on Netty (v4.0.42.Final). The module provides high‑performance network communication among the five core roles: NameServer, Broker‑Master, Broker‑Slave, Producer and Consumer.
Key Interfaces and Classes
RemotingService : defines start(), shutdown() and registerRPCHook(RPCHook).
RemotingClient / RemotingServer : extend RemotingService and expose client‑side and server‑side APIs.
NettyRemotingAbstract : abstract class that implements the common logic for encoding/decoding RemotingCommand, request/response handling and flow‑control.
NettyRemotingClient and NettyRemotingServer : concrete Netty implementations used by all RocketMQ components.
Protocol Design
The binary protocol header consists of the following fields:
code // request/response type
language // client language
version // client version
opaque // request id (unique per channel)
flag // request flag (e.g., oneway)
remark // optional text
extFields // custom key‑value mapOn the wire the message is composed of four parts:
Total length (4 bytes, int).
Protocol type + header length (4 bytes, first byte = serialization type, next 3 bytes = header length).
Header data (JSON or other serialization).
Body data (optional binary payload).
Encoding Example
public ByteBuffer encode() {
int length = 4; // placeholder for total length
byte[] headerData = this.headerEncode();
length += headerData.length;
if (this.body != null) {
length += body.length;
}
ByteBuffer result = ByteBuffer.allocate(4 + length);
result.putInt(length);
result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));
result.put(headerData);
if (this.body != null) {
result.put(this.body);
}
result.flip();
return result;
}
private static byte[] markProtocolType(int source, SerializeType type) {
byte[] result = new byte[4];
result[0] = type.getCode();
result[1] = (byte) ((source >> 16) & 0xFF);
result[2] = (byte) ((source >> 8) & 0xFF);
result[3] = (byte) (source & 0xFF);
return result;
}Decoding Example
public static RemotingCommand decode(final ByteBuffer byteBuffer) {
int length = byteBuffer.limit();
int oriHeaderLen = byteBuffer.getInt();
int headerLength = getHeaderLength(oriHeaderLen);
byte[] headerData = new byte[headerLength];
byteBuffer.get(headerData);
RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen));
int bodyLength = length - 4 - headerLength;
if (bodyLength > 0) {
byte[] bodyData = new byte[bodyLength];
byteBuffer.get(bodyData);
cmd.body = bodyData;
}
return cmd;
}Communication Modes
Sync : the caller blocks until a response is received.
Async : the caller supplies an InvokeCallback that is executed when the response arrives.
Oneway : fire‑and‑forget messages such as heartbeats.
Async Send (client side)
public void invokeAsyncImpl(final Channel channel,
final RemotingCommand request,
final long timeoutMillis,
final InvokeCallback invokeCallback)
throws InterruptedException, RemotingTooMuchRequestException,
RemotingTimeoutException, RemotingSendRequestException {
final int opaque = request.getOpaque();
if (semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS)) {
SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(semaphoreAsync);
ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis,
invokeCallback, once);
responseTable.put(opaque, responseFuture);
try {
channel.writeAndFlush(request).addListener(f -> {
if (f.isSuccess()) {
responseFuture.setSendRequestOK(true);
} else {
responseFuture.setSendRequestOK(false);
responseFuture.putResponse(null);
responseTable.remove(opaque);
try {
executeInvokeCallback(responseFuture);
} catch (Throwable e) {
log.warn("execute callback error", e);
} finally {
responseFuture.release();
}
}
});
} catch (Exception e) {
responseFuture.release();
log.warn("send request error", e);
throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
}
} else {
throw new RemotingTimeoutException("invokeAsyncImpl tryAcquire timeout");
}
}Key points: opaque is the request id used to correlate request/response. semaphoreAsync limits the number of concurrent async requests per channel (flow control).
A ResponseFuture stores the callback, timeout and a one‑time semaphore releaser.
The future is put into responseTable (a ConcurrentHashMap<Integer, ResponseFuture>).
When the write succeeds or fails, the future’s status is updated and the callback is eventually invoked.
Server Request Processing
public void processRequestCommand(final ChannelHandlerContext ctx,
final RemotingCommand cmd) {
Pair<NettyRequestProcessor, ExecutorService> matched =
processorTable.get(cmd.getCode());
Pair<NettyRequestProcessor, ExecutorService> pair =
matched != null ? matched : this.defaultRequestProcessor;
final int opaque = cmd.getOpaque();
if (pair != null) {
Runnable run = () -> {
try {
RPCHook rpcHook = NettyRemotingAbstract.this.getRPCHook();
if (rpcHook != null) {
rpcHook.doBeforeRequest(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
}
RemotingCommand response = pair.getObject1().processRequest(ctx, cmd);
if (rpcHook != null) {
rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);
}
if (!cmd.isOnewayRPC() && response != null) {
response.setOpaque(opaque);
response.markResponseType();
ctx.writeAndFlush(response);
}
} catch (Throwable t) {
// error handling omitted for brevity
}
};
RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);
try {
pair.getObject2().submit(requestTask);
} catch (RejectedExecutionException e) {
// overload handling omitted for brevity
}
} else {
log.warn("request code not supported: " + cmd.getCode());
RemotingCommand response = RemotingCommand.createResponseCommand(
RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED,
"request code not supported");
response.setOpaque(opaque);
ctx.writeAndFlush(response);
}
}The server maintains processorTable (
Map<Integer, Pair<NettyRequestProcessor, ExecutorService>>) that maps a request code to a processor and a dedicated thread pool. The request is wrapped in a RequestTask and submitted to the pool, ensuring that the Netty I/O thread is never blocked.
Client Response Handling
public void processResponseCommand(final ChannelHandlerContext ctx,
final RemotingCommand cmd) {
final int opaque = cmd.getOpaque();
final ResponseFuture responseFuture = responseTable.get(opaque);
if (responseFuture != null) {
responseFuture.setResponseCommand(cmd);
responseTable.remove(opaque);
if (responseFuture.getInvokeCallback() != null) {
executeInvokeCallback(responseFuture);
} else {
responseFuture.putResponse(cmd);
responseFuture.release();
}
} else {
log.warn("receive response, but not matched any request, " +
RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
log.warn(cmd.toString());
}
}When a response arrives, the client extracts the opaque id, looks up the corresponding ResponseFuture in responseTable, stores the response, removes the entry and either executes the user‑provided InvokeCallback or releases the semaphore for synchronous callers.
Timeout Scanning
public void scanResponseTable() {
List<ResponseFuture> timeoutFutures = new LinkedList<>();
Iterator<Map.Entry<Integer, ResponseFuture>> it = responseTable.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<Integer, ResponseFuture> entry = it.next();
ResponseFuture rf = entry.getValue();
if (rf.getBeginTimestamp() + rf.getTimeoutMillis() + 1000 <= System.currentTimeMillis()) {
rf.release();
it.remove();
timeoutFutures.add(rf);
log.warn("remove timeout request, " + rf);
}
}
for (ResponseFuture rf : timeoutFutures) {
try {
executeInvokeCallback(rf);
} catch (Throwable e) {
log.warn("scanResponseTable callback error", e);
}
}
}The client schedules this task every second. It removes futures whose timeout has expired, releases their semaphores and triggers the callback with a timeout exception.
Summary
RocketMQ’s Remoting module separates concerns cleanly: a lightweight binary protocol, Netty‑based transport, pluggable request processors, and a robust asynchronous request/response mechanism built on ResponseFuture and semaphores. Understanding the flow of RemotingCommand encoding, async send, server processing, and client response handling provides a solid foundation for extending or troubleshooting RocketMQ’s networking layer.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Programmer DD
A tinkering programmer and author of "Spring Cloud Microservices in Action"
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
