Analysis of Java NIO and Tars Framework Network Programming
The article explains Java NIO’s non‑blocking channels, buffers, and selectors, then shows how the open‑source Tars RPC framework builds a multi‑reactor, multi‑thread network model on top of NIO—detailing server socket setup, event dispatch, session management, and read/write processing.
This article, authored by the vivo Internet Server Team, introduces the fundamentals of Java NIO and analyzes how the Tars framework utilizes NIO for network programming.
1. Tars Framework Overview
Tars is an open‑source, high‑performance RPC framework originally developed by Tencent. It supports multiple languages (C++, Java, PHP, Node.js, Go) and provides a complete solution for development, operation, and testing, including protocol codec, RPC communication, service discovery, monitoring, logging, and configuration management.
2. Java NIO Principles
Java NIO (New I/O) is a non‑blocking, buffer‑oriented API introduced in JDK 1.4. It provides channels, buffers, selectors and supports I/O multiplexing.
2.1 Channels and Buffers
Four main channel types are used:
FileChannel – file read/write.
DatagramChannel – UDP communication.
SocketChannel – TCP communication.
ServerSocketChannel – listens for incoming TCP connections.
Typical usage of a ServerSocketChannel in a loop:
while(true){
SocketChannel socketChannel = serverSocketChannel.accept();
// do something with socketChannel...
}Configuring a non‑blocking ServerSocketChannel :
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().bind(new InetSocketAddress(8888));
serverSocketChannel.configureBlocking(false);
while(true){
SocketChannel socketChannel = serverSocketChannel.accept();
if(socketChannel != null){
// do something with socketChannel...
}
}Buffers such as ByteBuffer , CharBuffer , etc., are allocated with ByteBuffer.allocate(2048) and follow a four‑step workflow: write → flip → read → clear/compact.
2.2 Selector
A Selector allows a single thread to monitor multiple channels. Channels are registered with interest sets (e.g., SelectionKey.OP_READ ) and the selector blocks on select() until an event is ready.
channel.configureBlocking(false);
SelectionKey key = channel.register(selector, SelectionKey.OP_READ);The SelectionKey provides interestOps() , readyOps() , and helper methods such as isAcceptable() , isReadable() , etc.
3. Tars NIO Network Programming
The Tars network model follows a multi‑reactor, multi‑thread architecture. A typical server workflow includes:
Create a non‑blocking ServerSocketChannel and bind a port.
Create a Selector .
Register OP_ACCEPT on the server channel.
Loop: call select() , dispatch events, handle accept/read/write.
Typical client initialization steps involve creating a Communicator , obtaining a ServantProxy , and initializing the selector manager.
Server startup example (binding and selector registration):
public void bind(AppService appService) throws IOException {
// ... omitted non‑essential code
if (endpoint.type().equals("tcp")) { // 1
this.selectorManager = new SelectorManager(Utils.getSelectorPoolSize(), new ServantProtocolFactory(codec), threadPool, processor, keepAlive, "server-tcp-reactor", false); // 2
this.selectorManager.setTcpNoDelay(serverCfg.isTcpNoDelay());
this.selectorManager.start();
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.socket().bind(new InetSocketAddress(endpoint.host(), endpoint.port()), 1024); // 3
serverChannel.configureBlocking(false);
selectorManager.getReactor(0).registerChannel(serverChannel, SelectionKey.OP_ACCEPT); // 4
} else if (endpoint.type().equals("udp")) {
// UDP handling omitted for brevity
}
}Reactor thread run loop:
public void run() {
while (!Thread.interrupted()) {
selector.select(); // 1
processRegister(); // 2
Iterator
iter = selector.selectedKeys().iterator(); // 3
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
if (!key.isValid()) continue;
try {
if (key.attachment() != null && key.attachment() instanceof Session) {
((Session) key.attachment()).updateLastOperationTime(); // 4
}
dispatchEvent(key); // 5
} catch (Throwable ex) {
disConnectWithException(key, ex);
}
}
processUnRegister(); // 6
}
}Accept event handling creates a TCPSession , configures the channel, and registers it for read events:
public void handleAcceptEvent(SelectionKey key) throws IOException {
ServerSocketChannel server = (ServerSocketChannel) key.channel(); // 1
SocketChannel channel = server.accept();
channel.socket().setTcpNoDelay(selectorManager.isTcpNoDelay());
channel.configureBlocking(false);
Utils.setQosFlag(channel.socket());
TCPSession session = new TCPSession(selectorManager); // 2
session.setChannel(channel);
session.setStatus(SessionStatus.SERVER_CONNECTED);
session.setKeepAlive(selectorManager.isKeepAlive());
session.setTcpNoDelay(selectorManager.isTcpNoDelay());
SessionManager.getSessionManager().registerSession(session); // 3
selectorManager.nextReactor().registerChannel(channel, SelectionKey.OP_READ, session); // 4
}Read and write events delegate to session.read() and session.doWrite() . The read logic distinguishes client‑side responses and server‑side requests, decoding them via the Tars protocol decoder and dispatching to a thread pool.
Example of the read method:
protected void read() throws IOException {
int ret = readChannel();
if (this.status == SessionStatus.CLIENT_CONNECTED) {
readResponse();
} else if (this.status == SessionStatus.SERVER_CONNECTED) {
readRequest();
} else {
throw new IllegalStateException("The current session status is invalid. [status:" + this.status + "]");
}
if (ret < 0) {
close();
return;
}
}The corresponding readResponse() and readRequest() methods decode protocol messages and submit them to worker threads for processing.
Write handling enqueues buffers into a bounded queue and triggers the selector’s write interest:
protected void write(IoBuffer buffer) throws IOException {
if (buffer == null) return;
if (channel == null || key == null) throw new IOException("Connection is closed");
if (!this.queue.offer(buffer.buf())) {
throw new IOException("The session queue is full. [ queue size:" + queue.size() + " ]");
}
if (key != null) {
key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
key.selector().wakeup();
}
}4. Summary
The article covers Java NIO basics and provides a detailed source‑code analysis of the Tars‑Java 1.7.2 network module. Although newer Tars‑Java versions have migrated to Netty, understanding NIO remains essential for mastering low‑level network programming.
vivo Internet Technology
Sharing practical vivo Internet technology insights and salon events, plus the latest industry news and hot conferences.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.