Netty TCP Long‑Connection Demo with Spring Boot, Redis, and Message‑Queue Simulation
This article presents a complete Netty TCP long‑connection demo built with Spring Boot 2.2.0 and Redis, explains the project architecture, module layout, business flow using a local BlockingQueue instead of RocketMQ, and provides detailed source code for the queue holder, processing logic, client implementation, handler, and test controllers.
Project background: the author needed a socket long‑connection for an IoT project, encountered many bugs, and decided to share a cleaned‑up demo for learning purposes.
Architecture: the demo combines Netty, Redis and Spring Boot 2.2.0. It is divided into three modules – netty‑tcp‑core (utility classes), netty‑tcp‑server (test server, not used in production) and netty‑tcp‑client (the focus of the article).
Modules: the directory structure is illustrated in the original article; the core module provides common tools, the server module is only for testing, and the client module implements the actual communication logic.
Business flow: the original project used RocketMQ as a message queue, but the demo replaces it with a local BlockingQueue. The data flow is Producer → Message Queue → Consumer (client) → TCP channel → Server → TCP channel → Client.
Code details:
package org.example.client;
import org.example.client.model.NettyMsgModel;
import java.util.concurrent.ArrayBlockingQueue;
/**
* This demo uses a local queue; in production a real MQ such as RocketMQ or RabbitMQ should be used.
*/
public class QueueHolder {
private static final ArrayBlockingQueue<NettyMsgModel> queue = new ArrayBlockingQueue<>(100);
public static ArrayBlockingQueue<NettyMsgModel> get() { return queue; }
}The processing thread continuously takes messages from the queue and hands them to the message processor.
public class LoopThread implements Runnable {
@Override
public void run() {
for (int i = 0; i < MAIN_THREAD_POOL_SIZE; i++) {
executor.execute(() -> {
while (true) {
try {
NettyMsgModel nettyMsgModel = QueueHolder.get().take();
messageProcessor.process(nettyMsgModel);
} catch (InterruptedException e) {
log.error(e.getMessage(), e);
}
}
});
}
}
}The process method handles message deduplication, locking with Redis, client lookup/creation, and sending.
public void process(NettyMsgModel nettyMsgModel) {
String imei = nettyMsgModel.getImei();
try {
synchronized (this) {
if (redisCache.hasKey(NETTY_QUEUE_LOCK + imei)) {
log.info("imei={} message processing, re‑queue", imei);
new Timer().schedule(new TimerTask() {
@Override
public void run() { QueueHolder.get().offer(nettyMsgModel); }
}, 2000);
log.info("imei={} re‑queued", imei);
return;
} else {
redisCache.setCacheObject(NETTY_QUEUE_LOCK + imei, "1", 120, TimeUnit.SECONDS);
}
}
if (NettyClientHolder.get().containsKey(imei)) {
NettyClient nettyClient = NettyClientHolder.get().get(imei);
if (nettyClient.getChannelFuture() != null && nettyClient.getChannelFuture().channel().isActive()) {
if (!nettyClient.getChannelFuture().channel().isWritable()) {
log.warn("Channel not writable, imei={}, channelId={}", imei, nettyClient.getChannelFuture().channel().id());
}
nettyClient.send(nettyMsgModel.getMsg());
} else {
log.info("client imei={} channel inactive, closing", imei);
nettyClient.close();
this.createClientAndSend(nettyMsgModel);
}
} else {
this.createClientAndSend(nettyMsgModel);
}
} catch (Exception e) {
log.error(e.getMessage(), e);
} finally {
redisCache.deleteObject(NETTY_QUEUE_LOCK + imei);
}
}Client creation and sending logic ensures the client is initialized, waits (up to 5 seconds) for the channel to become active, stores the client in a local cache, and then sends the message.
private void createClientAndSend(NettyMsgModel nettyMsgModel) {
log.info("Creating client for imei={}", nettyMsgModel.getImei());
NettyClient nettyClient = SpringUtils.getBean(NettyClient.class, nettyMsgModel.getImei(), nettyMsgModel.getBizData(),
this.createDefaultWorkGroup(this.workerThread), DemoClientHandler.class);
executor.execute(nettyClient);
try {
synchronized (nettyClient) {
long start = System.currentTimeMillis();
nettyClient.wait(5000);
long end = System.currentTimeMillis();
log.info("Client wait time={}ms", end - start);
}
if (nettyClient.getChannelFuture() != null && nettyClient.getChannelFuture().channel().isActive()) {
NettyClientHolder.get().put(nettyMsgModel.getImei(), nettyClient);
nettyClient.send(nettyMsgModel.getMsg());
} else {
log.warn("Client creation failed, imei={}", nettyMsgModel.getImei());
nettyClient.close();
}
} catch (Exception e) {
log.error("Client init error => {}", e.getMessage(), e);
}
}The Netty client implements Runnable, configures bootstrap options, handles reconnection attempts, and provides send and close methods.
public class NettyClient implements Runnable {
@Value("${netty.server.port}") private int port;
@Value("${netty.server.host}") private String host;
private String imei;
private Map<String, Object> bizData;
private EventLoopGroup workGroup;
private Class<BaseClientHandler> clientHandlerClass;
private ChannelFuture channelFuture;
// constructor, run(), init(), connect(), send(), close() ...
}The handler ( DemoClientHandler) reacts to channel activation, reads incoming messages, handles idle events, and logs exceptions.
public class DemoClientHandler extends BaseClientHandler {
private final String imei;
private final Map<String, Object> bizData;
private final NettyClient nettyClient;
private int allIdleCounter = 0;
private static final int MAX_IDLE_TIMES = 3;
public DemoClientHandler(NettyClient nettyClient) { this.nettyClient = nettyClient; this.imei = nettyClient.getImei(); this.bizData = nettyClient.getBizData(); }
@Override public void channelActive(ChannelHandlerContext ctx) { log.info("Client imei={} channel active", imei); synchronized (nettyClient) { nettyClient.notify(); } }
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) { log.info("Client imei={} received: {}", imei, msg); if ("shutdown".equals(msg)) nettyClient.close(); }
@Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) { /* idle handling */ }
@Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { log.error("Client imei={} exception {}", imei, cause.getMessage(), cause); }
}A simple in‑memory holder stores active clients:
public class NettyClientHolder {
private static final ConcurrentHashMap<String, NettyClient> clientMap = new ConcurrentHashMap<>();
public static ConcurrentHashMap<String, NettyClient> get() { return clientMap; }
}Test controllers expose three endpoints ( testOne, testTwo, testThree) that enqueue messages, demonstrate delayed consumption caused by the Redis lock, and show how a shutdown message can close a connection. Log screenshots from the original article illustrate the behavior.
Conclusion: the demo is intended for learning and sharing; it is not production‑ready and readers are encouraged to discuss improvements.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Architect
Professional architect sharing high‑quality architecture insights. Topics include high‑availability, high‑performance, high‑stability architectures, big data, machine learning, Java, system and distributed architecture, AI, and practical large‑scale architecture case studies. Open to ideas‑driven architects who enjoy sharing and learning.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
