Netty TCP Demo: Long‑Lived Socket Connection Architecture and Implementation
This article presents a complete Netty‑based TCP demo for IoT projects, detailing the project background, architecture, module layout, business flow, and step‑by‑step code explanations—including message queue handling, client creation, Netty client implementation, handler logic, caching, and testing endpoints—providing a practical reference for building robust long‑connection services.
Project Background – The author needed a reliable long‑connection socket solution for an IoT project, encountered many bugs, and decided to share a cleaned‑up demo that removes business‑specific code.
Architecture – The demo uses netty, redis and springboot2.2.0 to build a client‑server communication model.
Modules – The repository contains three main modules: netty-tcp-core (utilities), netty-tcp-server (test server, not used in production), and netty-tcp-client (the focus of the article).
Business Flow – Messages are produced, placed into a BlockingQueue (simulating RocketMQ), consumed by a loop thread, and then sent through a TCP channel to the server. The flow is illustrated as:
Producer → Message Queue → Consumer (client) → TCP channel → Server → TCP channel → Client
Code Details
1. Message Queue
package org.example.client;
import org.example.client.model.NettyMsgModel;
import java.util.concurrent.ArrayBlockingQueue;
/**
* 本项目为演示使用本地队列 实际生产中应该使用消息中间件代替(rocketmq或rabbitmq)
*/
public class QueueHolder {
private static final ArrayBlockingQueue<NettyMsgModel> queue = new ArrayBlockingQueue<>(100);
public static ArrayBlockingQueue<NettyMsgModel> get() { return queue; }
}The queue is polled by a LoopThread that continuously takes messages and processes them:
public class LoopThread implements Runnable {
@Override
public void run() {
for (int i = 0; i < MAIN_THREAD_POOL_SIZE; i++) {
executor.execute(() -> {
while (true) {
try {
NettyMsgModel nettyMsgModel = QueueHolder.get().take();
messageProcessor.process(nettyMsgModel);
} catch (InterruptedException e) {
log.error(e.getMessage(), e);
}
}
});
}
}
}2. Execution Class
The process method in MessageProcessor handles locking, Redis cache checks, client creation, and message sending, ensuring that duplicate connections are avoided and that messages are retried when necessary.
public void process(NettyMsgModel nettyMsgModel) {
String imei = nettyMsgModel.getImei();
try {
synchronized (this) {
if (redisCache.hasKey(NETTY_QUEUE_LOCK + imei)) {
log.info("imei={}消息处理中,重新入列", imei);
new Timer().schedule(new TimerTask() {
@Override
public void run() { QueueHolder.get().offer(nettyMsgModel); }
}, 2000);
return;
} else {
redisCache.setCacheObject(NETTY_QUEUE_LOCK + imei, "1", 120, TimeUnit.SECONDS);
}
}
if (NettyClientHolder.get().containsKey(imei)) {
NettyClient nettyClient = NettyClientHolder.get().get(imei);
if (nettyClient.getChannelFuture() != null && nettyClient.getChannelFuture().channel().isActive()) {
nettyClient.send(nettyMsgModel.getMsg());
} else {
createClientAndSend(nettyMsgModel);
}
} else {
createClientAndSend(nettyMsgModel);
}
} catch (Exception e) {
log.error(e.getMessage(), e);
} finally {
redisCache.deleteObject(NETTY_QUEUE_LOCK + imei);
}
}3. Netty Client
The client is a prototype‑scoped Spring bean that holds connection parameters, business data, and a Netty ChannelFuture. It initializes a Bootstrap with appropriate options, adds decoders, encoders, idle handlers, and a custom handler, then attempts to connect with up to two retries.
public class NettyClient implements Runnable {
@Value("${netty.server.port}") private int port;
@Value("${netty.server.host}") private String host;
private String imei;
private Map<String, Object> bizData;
private EventLoopGroup workGroup;
private Class<BaseClientHandler> clientHandlerClass;
private ChannelFuture channelFuture;
// constructor omitted
@Override
public void run() {
try { init(); log.info("客户端启动imei={}", imei); }
catch (Exception e) { log.error("客户端启动失败:{}", e.getMessage(), e); }
}
private void init() throws Exception {
BaseClientHandler clientHandler = SpringUtils.getBean(clientHandlerClass, this);
Bootstrap b = new Bootstrap();
b.group(workGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024*1024, Unpooled.copiedBuffer("
".getBytes())));
ch.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8));
ch.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8));
ch.pipeline().addLast(new IdleStateHandler(0,0,600, TimeUnit.SECONDS));
ch.pipeline().addLast(clientHandler);
}
});
connect(b);
}
// connect method with retry logic omitted for brevity
}4. DemoClientHandler
The handler processes channel activation, deactivation, inbound messages, idle events, and exceptions. On activation it notifies the waiting thread, allowing the queued message to be sent.
public class DemoClientHandler extends BaseClientHandler {
private final String imei;
private final Map<String, Object> bizData;
private final NettyClient nettyClient;
private int allIdleCounter = 0;
private static final int MAX_IDLE_TIMES = 3;
public DemoClientHandler(NettyClient nettyClient) {
this.nettyClient = nettyClient;
this.imei = nettyClient.getImei();
this.bizData = nettyClient.getBizData();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info("客户端imei={},通道激活成功", imei);
synchronized (nettyClient) { nettyClient.notify(); }
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.info("客户端imei={},收到消息: {}", imei, msg);
if ("shutdown".equals(msg)) { nettyClient.close(); }
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent e = (IdleStateEvent) evt;
if (e.state() == IdleState.ALL_IDLE) {
allIdleCounter++;
if (allIdleCounter >= MAX_IDLE_TIMES) { ctx.channel().close(); }
}
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.error("客户端imei={},连接异常{}", imei, cause.getMessage(), cause);
}
}5. Client Cache
public class NettyClientHolder {
private static final ConcurrentHashMap<String, NettyClient> clientMap = new ConcurrentHashMap<>();
public static ConcurrentHashMap<String, NettyClient> get() { return clientMap; }
}Because Netty channels are not serializable, they are stored only in local memory.
6. Testing Endpoints
A Spring MVC controller provides three test APIs: /demo/testOne – Sends two messages with a 5‑second interval, demonstrating client creation on the first message and reuse on the second. /demo/testTwo?imei=...&msg=... – Sends an arbitrary message to a specified device. /demo/testThree – Sends two messages for the same device back‑to‑back, causing the second to be re‑queued due to the Redis lock.
Log screenshots show the expected behavior: client creation, message sending, idle handling, and graceful shutdown.
Conclusion – The demo is intended for learning and should be adapted before production use, e.g., replacing the local queue with a real message broker, adding login handshake, and configuring thread groups appropriately.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Top Architect
Top Architect focuses on sharing practical architecture knowledge, covering enterprise, system, website, large‑scale distributed, and high‑availability architectures, plus architecture adjustments using internet technologies. We welcome idea‑driven, sharing‑oriented architects to exchange and learn together.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
