Netty TCP Demo: Architecture, Code Walkthrough, and Usage Guide
This article presents a comprehensive Netty TCP demo project, covering its architecture, module layout, business flow, detailed Java code for message queuing, client execution, handler implementation, testing endpoints, and deployment considerations, providing a practical learning resource for backend developers.
Project Background
The author needed a long‑lived socket connection for an IoT project, encountered many bugs, and decided to share a cleaned‑up demo that removes the messy business logic and focuses on the core Netty client implementation.
Main Content
1. Project Architecture
The project uses Netty, Redis, and Spring Boot 2.2.0.
2. Project Modules
The directory structure is shown in an image (omitted). The key modules are: netty-tcp-core – common utilities; netty-tcp-server – test server (not used in production); netty-tcp-client – the client, which is the focus of this article.
3. Business Flow
In the real project RocketMQ is used, but the demo replaces it with a BlockingQueue. The data flow is:
Producer → Message Queue → Consumer (client) → TCP channel → Server → TCP channel → Client.
When the consumer receives a device message, it checks the cache for an existing connection. If the connection exists and the channel is active, the message is sent directly; otherwise a new channel is created and the message is sent after activation.
4. Code Details
1. Message Queue
Because the demo removes the middleware, a local queue is created to simulate the real scenario.
package org.example.client;
import org.example.client.model.NettyMsgModel;
import java.util.concurrent.ArrayBlockingQueue;
/**
* This project is for demonstration; in production a message middleware such as RocketMQ or RabbitMQ should be used.
*
* @author ReWind00
* @date 2023/2/15 11:20
*/
public class QueueHolder {
private static final ArrayBlockingQueue<NettyMsgModel> queue = new ArrayBlockingQueue<>(100);
public static ArrayBlockingQueue<NettyMsgModel> get() {
return queue;
}
}A thread continuously takes messages from the queue and processes them asynchronously.
public class LoopThread implements Runnable {
@Override
public void run() {
for (int i = 0; i < MAIN_THREAD_POOL_SIZE; i++) {
executor.execute(() -> {
while (true) {
// Take the head object from the BlockingQueue; block if empty.
try {
NettyMsgModel nettyMsgModel = QueueHolder.get().take();
messageProcessor.process(nettyMsgModel);
} catch (InterruptedException e) {
log.error(e.getMessage(), e);
}
}
});
}
}
}2. Execution Class
The process method belongs to the MessageProcessor class, which is a singleton but may be invoked by multiple threads.
public void process(NettyMsgModel nettyMsgModel) {
String imei = nettyMsgModel.getImei();
try {
synchronized (this) { // Prevent duplicate client creation for the same device.
if (redisCache.hasKey(NETTY_QUEUE_LOCK + imei)) {
log.info("imei={} message processing, re‑queueing", imei);
// Re‑queue with a delay (in production use RocketMQ or RabbitMQ for delayed consumption).
new Timer().schedule(new TimerTask() {
@Override
public void run() {
QueueHolder.get().offer(nettyMsgModel);
}
}, 2000);
log.info("imei={} message processing, re‑queue completed", imei);
return;
} else {
// Acquire lock if no connection exists.
redisCache.setCacheObject(NETTY_QUEUE_LOCK + imei, "1", 120, TimeUnit.SECONDS);
}
}
// If the cache contains a client, send the message; otherwise create a new client.
if (NettyClientHolder.get().containsKey(imei)) {
NettyClient nettyClient = NettyClientHolder.get().get(imei);
if (nettyClient.getChannelFuture() != null && nettyClient.getChannelFuture().channel().isActive()) {
if (!nettyClient.getChannelFuture().channel().isWritable()) {
log.warn("Warning, channel not writable, imei={}, channelId={}", nettyClient.getImei(), nettyClient.getChannelFuture().channel().id());
}
nettyClient.send(nettyMsgModel.getMsg());
} else {
log.info("client imei={}, channel inactive, closing", nettyClient.getImei());
nettyClient.close();
this.createClientAndSend(nettyMsgModel);
}
} else {
this.createClientAndSend(nettyMsgModel);
}
} catch (Exception e) {
log.error(e.getMessage(), e);
} finally {
// Release lock after processing.
redisCache.deleteObject(NETTY_QUEUE_LOCK + imei);
}
}The method uses the device IMEI as a cache key, applies a synchronized block and a Redis distributed lock to avoid duplicate client creation, and re‑queues messages with a delay when the lock is held.
After acquiring the lock, the thread creates a client if none exists, otherwise it sends the message directly.
private void createClientAndSend(NettyMsgModel nettyMsgModel) {
log.info("Creating client, imei={}", nettyMsgModel.getImei());
// DemoClientHandler can be customized per business.
NettyClient nettyClient = SpringUtils.getBean(NettyClient.class, nettyMsgModel.getImei(), nettyMsgModel.getBizData(),
this.createDefaultWorkGroup(this.workerThread), DemoClientHandler.class);
executor.execute(nettyClient); // Execute client initialization.
try {
// Wait for client activation with a lock.
synchronized (nettyClient) {
long c1 = System.currentTimeMillis();
nettyClient.wait(5000); // Auto‑unlock after 5 seconds if activation fails.
long c2 = System.currentTimeMillis();
log.info("Client creation wait time={}ms", c2 - c1);
}
if (nettyClient.getChannelFuture() != null && nettyClient.getChannelFuture().channel().isActive()) {
// Store in cache.
NettyClientHolder.get().put(nettyMsgModel.getImei(), nettyClient);
// Send message after activation.
nettyClient.send(nettyMsgModel.getMsg());
} else {
log.warn("Client creation failed, imei={}", nettyMsgModel.getImei());
nettyClient.close();
// Optionally re‑queue the message.
}
} catch (Exception e) {
log.error("Client initialization exception => {}", e.getMessage(), e);
}
}3. Client
package org.example.client;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.CharsetUtil;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.example.client.handler.BaseClientHandler;
import org.example.core.util.SpringUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author ReWind00
* @date 2023/2/15 9:59
*/
@Slf4j
@Component
@Scope("prototype")
@Getter
@NoArgsConstructor
public class NettyClient implements Runnable {
@Value("${netty.server.port}")
private int port;
@Value("${netty.server.host}")
private String host;
// Unique device identifier
private String imei;
// Custom business data
private Map<String, Object> bizData;
private EventLoopGroup workGroup;
private Class<BaseClientHandler> clientHandlerClass;
private ChannelFuture channelFuture;
public NettyClient(String imei, Map<String, Object> bizData, EventLoopGroup workGroup, Class<BaseClientHandler> clientHandlerClass) {
this.imei = imei;
this.bizData = bizData;
this.workGroup = workGroup;
this.clientHandlerClass = clientHandlerClass;
}
@Override
public void run() {
try {
this.init();
log.info("Client started imei={}", imei);
} catch (Exception e) {
log.error("Client start failed:{}", e.getMessage(), e);
}
}
public void close() {
if (this.channelFuture != null) {
this.channelFuture.channel().close();
}
NettyClientHolder.get().remove(this.imei);
}
public void send(String message) {
try {
if (!this.channelFuture.channel().isActive()) {
log.info("Channel inactive imei={}", this.imei);
return;
}
if (!StringUtils.isEmpty(message)) {
log.info("Queue message sending => {}", message);
this.channelFuture.channel().writeAndFlush(message);
}
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
private void init() throws Exception {
// Pass this instance to the handler.
BaseClientHandler clientHandler = SpringUtils.getBean(clientHandlerClass, this);
Bootstrap b = new Bootstrap();
// Build client.
b.group(workGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
.option(ChannelOption.SO_RCVBUF, 1024 * 32)
.option(ChannelOption.SO_SNDBUF, 1024 * 32)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024 * 1024, Unpooled.copiedBuffer("
".getBytes())));
ch.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8));
ch.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8));
ch.pipeline().addLast(new IdleStateHandler(0, 0, 600, TimeUnit.SECONDS));
ch.pipeline().addLast(clientHandler);
}
});
this.connect(b);
}
private void connect(Bootstrap b) throws InterruptedException {
long c1 = System.currentTimeMillis();
final int maxRetries = 2; // Retry twice.
final AtomicInteger count = new AtomicInteger();
final AtomicBoolean flag = new AtomicBoolean(false);
try {
this.channelFuture = b.connect(host, port).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
if (count.incrementAndGet() > maxRetries) {
log.warn("imei={} reconnection exceeded {} times", imei, maxRetries);
} else {
log.info("imei={} reconnection attempt {}", imei, count);
b.connect(host, port).addListener(this);
}
} else {
log.info("imei={} connected successfully, IP:{} port:{}", imei, host, port);
flag.set(true);
}
}
}).sync(); // Synchronous connection.
} catch (Exception e) {
log.error(e.getMessage(), e);
}
log.info("Device imei={} channelId={} connection time={}ms", imei, channelFuture.channel().id(), System.currentTimeMillis() - c1);
if (flag.get()) {
channelFuture.channel().closeFuture().sync(); // Block until channel closes.
}
}
}The client runs as a prototype bean; each instance blocks until its channel is closed. It supports two reconnection attempts and shares a common EventLoopGroup to avoid excessive thread creation.
4. Client Handler
package org.example.client.handler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import lombok.extern.slf4j.Slf4j;
import org.example.client.NettyClient;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
* @author ReWind00
* @date 2023/2/15 10:09
*/
@Slf4j
@Component
@Scope("prototype")
public class DemoClientHandler extends BaseClientHandler {
private final String imei;
private final Map<String, Object> bizData;
private final NettyClient nettyClient;
private int allIdleCounter = 0;
private static final int MAX_IDLE_TIMES = 3;
public DemoClientHandler(NettyClient nettyClient) {
this.nettyClient = nettyClient;
this.imei = nettyClient.getImei();
this.bizData = nettyClient.getBizData();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info("Client imei={} channel activated", this.imei);
synchronized (this.nettyClient) { // Unlock the waiting thread after activation.
this.nettyClient.notify();
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
log.warn("Client imei={} channel disconnected", this.imei);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.info("Client imei={} received message: {}", this.imei, msg);
// Business processing ...
if ("shutdown".equals(msg)) {
this.nettyClient.close();
}
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent e = (IdleStateEvent) evt;
boolean close = false;
if (e.state() == IdleState.ALL_IDLE) {
this.allIdleCounter++;
log.info("Client imei={} idle count {}", this.imei, this.allIdleCounter);
if (this.allIdleCounter >= MAX_IDLE_TIMES) {
close = true;
}
}
if (close) {
log.warn("Read/write idle {} times, closing connection", MAX_IDLE_TIMES);
ctx.channel().close();
}
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.error("Client imei={} connection exception {}", imei, cause.getMessage(), cause);
}
}The handler notifies the waiting client thread when the channel becomes active, processes incoming messages, and closes the connection on shutdown or after a configurable number of idle events.
5. Client Cache
package org.example.client;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author ReWind00
* @date 2023/2/15 11:01
*/
public class NettyClientHolder {
private static final ConcurrentHashMap<String, NettyClient> clientMap = new ConcurrentHashMap<>();
public static ConcurrentHashMap<String, NettyClient> get() {
return clientMap;
}
}Because Netty channels cannot be serialized, the cache is kept in a local ConcurrentHashMap rather than Redis.
5. Testing
A Spring Boot controller provides three test endpoints: testOne – sends two messages with a 5‑second interval; the first triggers client creation, the second uses the existing channel. testTwo – sends an arbitrary message to a specified IMEI. testThree – sends two messages for the same device; the second is re‑queued due to the Redis lock and processed later.
Log screenshots (omitted) demonstrate client creation, message sending, lock‑based re‑queueing, and graceful shutdown via a "shutdown" message.
6. Source Code
Repository: https://gitee.com/jaster/netty-tcp-demo
Afterword
The demo is intended for learning and exchange only; production use requires additional improvements and considerations.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Selected Java Interview Questions
A professional Java tech channel sharing common knowledge to help developers fill gaps. Follow us!
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
