Building a TCP Long‑Connection Solution with Netty and Spring Boot
This article walks through the design and implementation of a TCP long‑connection communication scheme using Netty and Spring Boot 2.2.0, covering project architecture, module layout, message flow, detailed code explanations, testing procedures, and deployment considerations.
Architecture
The demo combines netty, redis and springboot 2.2.0. It is organized into three Maven modules: netty-tcp-core – shared utility classes. netty-tcp-server – a test server (not used in production). netty-tcp-client – the main client implementation.
Message flow
In production RocketMQ is used; the demo replaces it with a local BlockingQueue. The data path is:
Producer → Message Queue → Consumer (client) → TCP channel → Server → TCP channel → ClientWhen a message arrives, the consumer checks whether a TCP channel for the device (identified by its IMEI) already exists and is active. If so, the message is sent directly; otherwise a new channel is created, the message is queued until the channel becomes active, and then sent.
Message queue
A static ArrayBlockingQueue<NettyMsgModel> holds incoming messages. The queue is accessed via a singleton holder:
package org.example.client;
import java.util.concurrent.ArrayBlockingQueue;
public class QueueHolder {
private static final ArrayBlockingQueue<NettyMsgModel> queue = new ArrayBlockingQueue<>(100);
public static ArrayBlockingQueue<NettyMsgModel> get() { return queue; }
}Processing thread
A dedicated thread pool continuously takes messages from the queue (blocking with take()) and forwards them to MessageProcessor:
public class LoopThread implements Runnable {
@Override
public void run() {
for (int i = 0; i < MAIN_THREAD_POOL_SIZE; i++) {
executor.execute(() -> {
while (true) {
try {
NettyMsgModel nettyMsgModel = QueueHolder.get().take();
messageProcessor.process(nettyMsgModel);
} catch (InterruptedException e) {
log.error(e.getMessage(), e);
}
}
});
}
}
}MessageProcessor
The processor obtains a Redis lock keyed by the device IMEI to avoid duplicate client creation. If the lock exists, the message is re‑queued with a 2 second delay; otherwise the lock is set and the cache is consulted:
If a client exists and its channel is active, the message is sent.
If the channel is inactive, the client is closed and a new client is created.
If no client exists, a new client is created.
After processing, the Redis lock is removed.
public void process(NettyMsgModel nettyMsgModel) {
String imei = nettyMsgModel.getImei();
try {
synchronized (this) {
if (redisCache.hasKey(NETTY_QUEUE_LOCK + imei)) {
log.info("imei={}消息处理中,重新入列", imei);
new Timer().schedule(new TimerTask() {
@Override public void run() { QueueHolder.get().offer(nettyMsgModel); }
}, 2000);
return;
} else {
redisCache.setCacheObject(NETTY_QUEUE_LOCK + imei, "1", 120, TimeUnit.SECONDS);
}
}
if (NettyClientHolder.get().containsKey(imei)) {
NettyClient nettyClient = NettyClientHolder.get().get(imei);
if (nettyClient.getChannelFuture() != null && nettyClient.getChannelFuture().channel().isActive()) {
if (!nettyClient.getChannelFuture().channel().isWritable()) {
log.warn("通道不可写,imei={},channelId={}", imei, nettyClient.getChannelFuture().channel().id());
}
nettyClient.send(nettyMsgModel.getMsg());
} else {
log.info("client imei={},通道不活跃,主动关闭", imei);
nettyClient.close();
this.createClientAndSend(nettyMsgModel);
}
} else {
this.createClientAndSend(nettyMsgModel);
}
} catch (Exception e) {
log.error(e.getMessage(), e);
} finally {
redisCache.deleteObject(NETTY_QUEUE_LOCK + imei);
}
}Client creation
The method creates a NettyClient bean, submits it to a thread pool, then waits up to 5 seconds for the client to become active using a synchronized wait(). On success the client is stored in NettyClientHolder and the message is sent; on timeout a warning is logged and the message can be re‑queued.
private void createClientAndSend(NettyMsgModel nettyMsgModel) {
log.info("创建客户端执行中 imei={}", nettyMsgModel.getImei());
NettyClient nettyClient = SpringUtils.getBean(
NettyClient.class,
nettyMsgModel.getImei(),
nettyMsgModel.getBizData(),
this.createDefaultWorkGroup(this.workerThread),
DemoClientHandler.class);
executor.execute(nettyClient);
try {
synchronized (nettyClient) {
long start = System.currentTimeMillis();
nettyClient.wait(5000);
long cost = System.currentTimeMillis() - start;
log.info("创建客户端 wait 耗时={}ms", cost);
}
if (nettyClient.getChannelFuture() != null && nettyClient.getChannelFuture().channel().isActive()) {
NettyClientHolder.get().put(nettyMsgModel.getImei(), nettyClient);
nettyClient.send(nettyMsgModel.getMsg());
} else {
log.warn("客户端创建失败,imei={}", nettyMsgModel.getImei());
nettyClient.close();
}
} catch (Exception e) {
log.error("客户端初始化发送消息异常=>{}", e.getMessage(), e);
}
}Netty client
The client is a prototype Spring bean. It receives the IMEI, business data, an EventLoopGroup, and a handler class. The run() method initializes a Bootstrap with connection timeout, buffer sizes and an IdleStateHandler. Connection attempts are retried twice; on success the ChannelFuture is stored and the thread blocks on closeFuture() until the channel closes.
@Component @Scope("prototype") @Getter @NoArgsConstructor
public class NettyClient implements Runnable {
@Value("${netty.server.port}") private int port;
@Value("${netty.server.host}") private String host;
private String imei;
private Map<String, Object> bizData;
private EventLoopGroup workGroup;
private Class<BaseClientHandler> clientHandlerClass;
private ChannelFuture channelFuture;
public NettyClient(String imei, Map<String, Object> bizData, EventLoopGroup workGroup,
Class<BaseClientHandler> clientHandlerClass) {
this.imei = imei; this.bizData = bizData; this.workGroup = workGroup; this.clientHandlerClass = clientHandlerClass;
}
@Override
public void run() {
try {
init();
log.info("客户端启动 imei={}", imei);
} catch (Exception e) {
log.error("客户端启动失败:{}", e.getMessage(), e);
}
}
private void init() throws Exception {
BaseClientHandler clientHandler = SpringUtils.getBean(clientHandlerClass, this);
Bootstrap b = new Bootstrap();
b.group(workGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
.option(ChannelOption.SO_RCVBUF, 1024 * 32)
.option(ChannelOption.SO_SNDBUF, 1024 * 32)
.handler(new ChannelInitializer<SocketChannel>() {
@Override protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024 * 1024, Unpooled.copiedBuffer("
".getBytes())));
ch.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8));
ch.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8));
ch.pipeline().addLast(new IdleStateHandler(0, 0, 600, TimeUnit.SECONDS));
ch.pipeline().addLast(clientHandler);
}
});
connect(b);
}
private void connect(Bootstrap b) throws InterruptedException {
long start = System.currentTimeMillis();
final int maxRetries = 2;
final AtomicInteger count = new AtomicInteger();
final AtomicBoolean success = new AtomicBoolean(false);
try {
this.channelFuture = b.connect(host, port).addListener((ChannelFutureListener) future -> {
if (!future.isSuccess()) {
if (count.incrementAndGet() > maxRetries) {
log.warn("imei={} 重连超过 {} 次", imei, maxRetries);
} else {
log.info("imei={} 重连第 {} 次", imei, count.get());
b.connect(host, port).addListener(this);
}
} else {
log.info("imei={} 连接成功, IP={} port={}", imei, host, port);
success.set(true);
}
}).sync();
} catch (Exception e) {
log.error(e.getMessage(), e);
}
log.info("设备 imei={} channelId={} 连接耗时={}ms", imei, channelFuture.channel().id(), System.currentTimeMillis() - start);
if (success.get()) {
channelFuture.channel().closeFuture().sync();
}
}
public void send(String message) {
try {
if (!channelFuture.channel().isActive()) {
log.info("通道不活跃 imei={}", imei);
return;
}
if (!StringUtils.isEmpty(message)) {
log.info("队列消息发送 => {}", message);
channelFuture.channel().writeAndFlush(message);
}
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
public void close() {
if (channelFuture != null) {
channelFuture.channel().close();
}
NettyClientHolder.get().remove(imei);
}
}Handler
DemoClientHandlerextends a base handler. In channelActive() it notifies the waiting client thread. channelRead() logs incoming messages and closes the client on a special "shutdown" command. userEventTriggered() counts consecutive ALL_IDLE events; after three occurrences the handler closes the channel.
@Component @Scope("prototype") @Slf4j
public class DemoClientHandler extends BaseClientHandler {
private final String imei;
private final Map<String, Object> bizData;
private final NettyClient nettyClient;
private int allIdleCounter = 0;
private static final int MAX_IDLE_TIMES = 3;
public DemoClientHandler(NettyClient nettyClient) {
this.nettyClient = nettyClient;
this.imei = nettyClient.getImei();
this.bizData = nettyClient.getBizData();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info("客户端 imei={},通道激活成功", imei);
synchronized (nettyClient) { nettyClient.notify(); }
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.info("客户端 imei={},收到消息: {}", imei, msg);
if ("shutdown".equals(msg)) { nettyClient.close(); }
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent e = (IdleStateEvent) evt;
if (e.state() == IdleState.ALL_IDLE) {
allIdleCounter++;
log.info("客户端 imei={} 触发闲读或写第 {} 次", imei, allIdleCounter);
if (allIdleCounter >= MAX_IDLE_TIMES) {
log.warn("读写超时达到 {} 次,主动断开连接", MAX_IDLE_TIMES);
ctx.channel().close();
}
}
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.error("客户端 imei={},连接异常 {}", imei, cause.getMessage(), cause);
}
}Client cache
The cache stores active clients in a local ConcurrentHashMap because Netty channels are not serializable:
public class NettyClientHolder {
private static final ConcurrentHashMap<String, NettyClient> clientMap = new ConcurrentHashMap<>();
public static ConcurrentHashMap<String, NettyClient> get() { return clientMap; }
}Testing
A Spring MVC controller exposes three endpoints: /demo/testOne – pushes two messages with a 5 second pause; the first triggers client creation, the second uses the existing channel. /demo/testTwo – pushes a single message with custom IMEI and payload. /demo/testThree – pushes two messages for the same device consecutively; the second is re‑queued and delayed because the Redis lock is held.
Log output confirms the expected behavior: client creation, channel reuse, and delayed consumption when the lock prevents concurrent creation.
Source repository
https://gitee.com/jaster/netty-tcp-demo
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
IoT Full-Stack Technology
Dedicated to sharing IoT cloud services, embedded systems, and mobile client technology, with no spam ads.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
