Netty TCP Demo: Long‑Lived Socket Connection Architecture and Implementation

This article presents a complete Netty‑based TCP demo for IoT projects, detailing the project background, architecture, module layout, business flow, and step‑by‑step code explanations—including message queue handling, client creation, Netty client implementation, handler logic, caching, and testing endpoints—providing a practical reference for building robust long‑connection services.

Top Architect
Top Architect
Top Architect
Netty TCP Demo: Long‑Lived Socket Connection Architecture and Implementation

Project Background – The author needed a reliable long‑connection socket solution for an IoT project, encountered many bugs, and decided to share a cleaned‑up demo that removes business‑specific code.

Architecture – The demo uses netty, redis and springboot2.2.0 to build a client‑server communication model.

Modules – The repository contains three main modules: netty-tcp-core (utilities), netty-tcp-server (test server, not used in production), and netty-tcp-client (the focus of the article).

Business Flow – Messages are produced, placed into a BlockingQueue (simulating RocketMQ), consumed by a loop thread, and then sent through a TCP channel to the server. The flow is illustrated as:

Producer → Message Queue → Consumer (client) → TCP channel → Server → TCP channel → Client

Code Details

1. Message Queue

package org.example.client;

import org.example.client.model.NettyMsgModel;
import java.util.concurrent.ArrayBlockingQueue;
/**
 * 本项目为演示使用本地队列 实际生产中应该使用消息中间件代替(rocketmq或rabbitmq)
 */
public class QueueHolder {
    private static final ArrayBlockingQueue<NettyMsgModel> queue = new ArrayBlockingQueue<>(100);
    public static ArrayBlockingQueue<NettyMsgModel> get() { return queue; }
}

The queue is polled by a LoopThread that continuously takes messages and processes them:

public class LoopThread implements Runnable {
    @Override
    public void run() {
        for (int i = 0; i < MAIN_THREAD_POOL_SIZE; i++) {
            executor.execute(() -> {
                while (true) {
                    try {
                        NettyMsgModel nettyMsgModel = QueueHolder.get().take();
                        messageProcessor.process(nettyMsgModel);
                    } catch (InterruptedException e) {
                        log.error(e.getMessage(), e);
                    }
                }
            });
        }
    }
}

2. Execution Class

The process method in MessageProcessor handles locking, Redis cache checks, client creation, and message sending, ensuring that duplicate connections are avoided and that messages are retried when necessary.

public void process(NettyMsgModel nettyMsgModel) {
    String imei = nettyMsgModel.getImei();
    try {
        synchronized (this) {
            if (redisCache.hasKey(NETTY_QUEUE_LOCK + imei)) {
                log.info("imei={}消息处理中,重新入列", imei);
                new Timer().schedule(new TimerTask() {
                    @Override
                    public void run() { QueueHolder.get().offer(nettyMsgModel); }
                }, 2000);
                return;
            } else {
                redisCache.setCacheObject(NETTY_QUEUE_LOCK + imei, "1", 120, TimeUnit.SECONDS);
            }
        }
        if (NettyClientHolder.get().containsKey(imei)) {
            NettyClient nettyClient = NettyClientHolder.get().get(imei);
            if (nettyClient.getChannelFuture() != null && nettyClient.getChannelFuture().channel().isActive()) {
                nettyClient.send(nettyMsgModel.getMsg());
            } else {
                createClientAndSend(nettyMsgModel);
            }
        } else {
            createClientAndSend(nettyMsgModel);
        }
    } catch (Exception e) {
        log.error(e.getMessage(), e);
    } finally {
        redisCache.deleteObject(NETTY_QUEUE_LOCK + imei);
    }
}

3. Netty Client

The client is a prototype‑scoped Spring bean that holds connection parameters, business data, and a Netty ChannelFuture. It initializes a Bootstrap with appropriate options, adds decoders, encoders, idle handlers, and a custom handler, then attempts to connect with up to two retries.

public class NettyClient implements Runnable {
    @Value("${netty.server.port}") private int port;
    @Value("${netty.server.host}") private String host;
    private String imei;
    private Map<String, Object> bizData;
    private EventLoopGroup workGroup;
    private Class<BaseClientHandler> clientHandlerClass;
    private ChannelFuture channelFuture;
    // constructor omitted
    @Override
    public void run() {
        try { init(); log.info("客户端启动imei={}", imei); }
        catch (Exception e) { log.error("客户端启动失败:{}", e.getMessage(), e); }
    }
    private void init() throws Exception {
        BaseClientHandler clientHandler = SpringUtils.getBean(clientHandlerClass, this);
        Bootstrap b = new Bootstrap();
        b.group(workGroup)
         .channel(NioSocketChannel.class)
         .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
         .handler(new ChannelInitializer<SocketChannel>() {
             @Override
             protected void initChannel(SocketChannel ch) throws Exception {
                 ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024*1024, Unpooled.copiedBuffer("
".getBytes())));
                 ch.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8));
                 ch.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8));
                 ch.pipeline().addLast(new IdleStateHandler(0,0,600, TimeUnit.SECONDS));
                 ch.pipeline().addLast(clientHandler);
             }
         });
        connect(b);
    }
    // connect method with retry logic omitted for brevity
}

4. DemoClientHandler

The handler processes channel activation, deactivation, inbound messages, idle events, and exceptions. On activation it notifies the waiting thread, allowing the queued message to be sent.

public class DemoClientHandler extends BaseClientHandler {
    private final String imei;
    private final Map<String, Object> bizData;
    private final NettyClient nettyClient;
    private int allIdleCounter = 0;
    private static final int MAX_IDLE_TIMES = 3;
    public DemoClientHandler(NettyClient nettyClient) {
        this.nettyClient = nettyClient;
        this.imei = nettyClient.getImei();
        this.bizData = nettyClient.getBizData();
    }
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        log.info("客户端imei={},通道激活成功", imei);
        synchronized (nettyClient) { nettyClient.notify(); }
    }
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        log.info("客户端imei={},收到消息: {}", imei, msg);
        if ("shutdown".equals(msg)) { nettyClient.close(); }
    }
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent e = (IdleStateEvent) evt;
            if (e.state() == IdleState.ALL_IDLE) {
                allIdleCounter++;
                if (allIdleCounter >= MAX_IDLE_TIMES) { ctx.channel().close(); }
            }
        }
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.error("客户端imei={},连接异常{}", imei, cause.getMessage(), cause);
    }
}

5. Client Cache

public class NettyClientHolder {
    private static final ConcurrentHashMap<String, NettyClient> clientMap = new ConcurrentHashMap<>();
    public static ConcurrentHashMap<String, NettyClient> get() { return clientMap; }
}

Because Netty channels are not serializable, they are stored only in local memory.

6. Testing Endpoints

A Spring MVC controller provides three test APIs: /demo/testOne – Sends two messages with a 5‑second interval, demonstrating client creation on the first message and reuse on the second. /demo/testTwo?imei=...&msg=... – Sends an arbitrary message to a specified device. /demo/testThree – Sends two messages for the same device back‑to‑back, causing the second to be re‑queued due to the Redis lock.

Log screenshots show the expected behavior: client creation, message sending, idle handling, and graceful shutdown.

Conclusion – The demo is intended for learning and should be adapted before production use, e.g., replacing the local queue with a real message broker, adding login handshake, and configuring thread groups appropriately.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

Socket
Top Architect
Written by

Top Architect

Top Architect focuses on sharing practical architecture knowledge, covering enterprise, system, website, large‑scale distributed, and high‑availability architectures, plus architecture adjustments using internet technologies. We welcome idea‑driven, sharing‑oriented architects to exchange and learn together.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.