Netty TCP Client Demo with Spring Boot: Architecture, Message Queue, and Complete Code Walkthrough

An in‑depth tutorial demonstrates building a Netty‑based TCP client with Spring Boot, covering project architecture, module layout, message‑queue simulation, detailed Java code for client creation, connection handling, and testing endpoints, providing a complete, production‑ready demo for IoT socket communication.

Top Architect
Top Architect
Top Architect
Netty TCP Client Demo with Spring Boot: Architecture, Message Queue, and Complete Code Walkthrough

Project Background

Recently a company IoT project required a socket long‑connection for message communication. After a buggy first version, the author refined the code into a demo, removing business‑specific parts, and shares it for learning.

Main Content

The author, a senior architect, introduces the demo and its purpose.

1. Project Architecture

The project uses netty, redis, and springboot2.2.0.

2. Project Modules

The directory structure is shown below (image omitted). The netty-tcp-core module contains utilities, netty-tcp-server is a test server, and netty-tcp-client is the focus of this article.

3. Business Process

In the real project RocketMQ is used as a message queue; the demo replaces it with a BlockingQueue. The data flow is:

Producer → Message Queue → Consumer (client) → TCP channel → Server → TCP channel → Client

When the consumer receives a device message, it checks whether a connection to the server already exists; if the channel is active it sends the message, otherwise it creates the channel, waits for activation, then sends. The client processes the server response accordingly.

4. Code Details

1. Message Queue

Because the demo removes the external middleware, a local queue is created to simulate the real scenario.

package org.example.client;

import org.example.client.model.NettyMsgModel;
import java.util.concurrent.ArrayBlockingQueue;
/**
 * 本项目为演示使用本地队列 实际生产中应该使用消息中间件代替(rocketmq或rabbitmq)
 *
 * @author ReWind00
 * @date 2023/2/15 11:20
 */
public class QueueHolder {
    private static final ArrayBlockingQueue<NettyMsgModel> queue = new ArrayBlockingQueue<>(100);
    public static ArrayBlockingQueue<NettyMsgModel> get() {
        return queue;
    }
}

A static queue instance is provided for quick access. A thread monitors the queue and processes messages asynchronously.

public class LoopThread implements Runnable {
    @Override
    public void run() {
        for (int i = 0; i < MAIN_THREAD_POOL_SIZE; i++) {
            executor.execute(() -> {
                while (true) {
                    //取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到
                    try {
                        NettyMsgModel nettyMsgModel = QueueHolder.get().take();
                        messageProcessor.process(nettyMsgModel);
                    } catch (InterruptedException e) {
                        log.error(e.getMessage(), e);
                    }
                }
            });
        }
    }
}

2. Execution Class

The process method belongs to MessageProcessor, a singleton that may be executed by multiple threads.

public void process(NettyMsgModel nettyMsgModel) {
    String imei = nettyMsgModel.getImei();
    try {
        synchronized (this) { // 为避免收到同一台设备多条消息后重复创建客户端,必须加锁
            if (redisCache.hasKey(NETTY_QUEUE_LOCK + imei)) { // 上一条消息处理中
                log.info("imei={}消息处理中,重新入列", imei);
                // 放回队列重新等待消费 延迟x秒(实际项目中应该使用rocketmq或者rabbitmq实现延迟消费)
                new Timer().schedule(new TimerTask() {
                    @Override
                    public void run() {
                        QueueHolder.get().offer(nettyMsgModel);
                    }
                }, 2000);
                log.info("imei={}消息处理中,重新入列完成", imei);
                return;
            } else {
                // 如果没有在连接中的直接加锁
                redisCache.setCacheObject(NETTY_QUEUE_LOCK + imei, "1", 120, TimeUnit.SECONDS);
            }
        }
        // 缓存中存在则发送消息
        if (NettyClientHolder.get().containsKey(imei)) {
            NettyClient nettyClient = NettyClientHolder.get().get(imei);
            if (null != nettyClient.getChannelFuture() && nettyClient.getChannelFuture().channel().isActive()) { // 通道活跃直接发送消息
                if (!nettyClient.getChannelFuture().channel().isWritable()) {
                    log.warn("警告,通道不可写,imei={},channelId={}", nettyClient.getImei(), nettyClient.getChannelFuture().channel().id());
                }
                nettyClient.send(nettyMsgModel.getMsg());
            } else {
                log.info("client imei={},通道不活跃,主动关闭", nettyClient.getImei());
                nettyClient.close();
                // 重新创建客户端发送
                this.createClientAndSend(nettyMsgModel);
            }
        } else { // 缓存中不存在则创建新的客户端
            this.createClientAndSend(nettyMsgModel);
        }
    } catch (Exception e) {
        log.error(e.getMessage(), e);
    } finally {
        // 执行完后解锁
        redisCache.deleteObject(NETTY_QUEUE_LOCK + imei);
    }
}

The method uses the device IMEI as a unique key, employs a Redis distributed lock to avoid duplicate client creation, and re‑queues messages if a lock is held.

After acquiring the lock, it checks the cache for an existing connection; if present it sends the message, otherwise it creates a new client.

private void createClientAndSend(NettyMsgModel nettyMsgModel) {
    log.info("创建客户端执行中imei={}", nettyMsgModel.getImei());
    // 此处的DemoClientHandler可以根据自己的业务定义
    NettyClient nettyClient = SpringUtils.getBean(NettyClient.class, nettyMsgModel.getImei(), nettyMsgModel.getBizData(),
            this.createDefaultWorkGroup(this.workerThread), DemoClientHandler.class);
    executor.execute(nettyClient); // 执行客户端初始化
    try {
        // 利用锁等待客户端激活
        synchronized (nettyClient) {
            long c1 = System.currentTimeMillis();
            nettyClient.wait(5000); // 最多阻塞5秒 5秒后客户端仍然未激活则自动解锁
            long c2 = System.currentTimeMillis();
            log.info("创建客户端wait耗时={}ms", c2 - c1);
        }
        if (null != nettyClient.getChannelFuture() && nettyClient.getChannelFuture().channel().isActive()) { // 连接成功
            // 存入缓存
            NettyClientHolder.get().put(nettyMsgModel.getImei(), nettyClient);
            // 客户端激活后发送消息
            nettyClient.send(nettyMsgModel.getMsg());
        } else { // 连接失败
            log.warn("客户端创建失败,imei={}", nettyMsgModel.getImei());
            nettyClient.close();
            // 可以把消息重新入列处理
        }
    } catch (Exception e) {
        log.error("客户端初始化发送消息异常===>{}", e.getMessage(), e);
    }
}

Client creation includes a wait (up to 5 seconds) for the channel to become active; if activation fails, the message can be re‑queued.

3. Client

The Netty client is a prototype‑scoped bean; each instance binds to a thread and holds its own business data.

package org.example.client;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.CharsetUtil;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.example.client.handler.BaseClientHandler;
import org.example.core.util.SpringUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @author ReWind00
 * @date 2023/2/15 9:59
 */
@Slf4j
@Component
@Scope("prototype")
@Getter
@NoArgsConstructor
public class NettyClient implements Runnable {

    @Value("${netty.server.port}")
    private int port;

    @Value("${netty.server.host}")
    private String host;
    // 客户端唯一标识
    private String imei;
    // 自定义业务数据
    private Map<String, Object> bizData;

    private EventLoopGroup workGroup;

    private Class<BaseClientHandler> clientHandlerClass;

    private ChannelFuture channelFuture;

    public NettyClient(String imei, Map<String, Object> bizData, EventLoopGroup workGroup, Class<BaseClientHandler> clientHandlerClass) {
        this.imei = imei;
        this.bizData = bizData;
        this.workGroup = workGroup;
        this.clientHandlerClass = clientHandlerClass;
    }

    @Override
    public void run() {
        try {
            this.init();
            log.info("客户端启动imei={}", imei);
        } catch (Exception e) {
            log.error("客户端启动失败:{}", e.getMessage(), e);
        }
    }

    public void close() {
        if (null != this.channelFuture) {
            this.channelFuture.channel().close();
        }
        NettyClientHolder.get().remove(this.imei);
    }

    public void send(String message) {
        try {
            if (!this.channelFuture.channel().isActive()) {
                log.info("通道不活跃imei={}", this.imei);
                return;
            }
            if (!StringUtils.isEmpty(message)) {
                log.info("队列消息发送===>{}", message);
                this.channelFuture.channel().writeAndFlush(message);
            }
        } catch (Exception e) {
            log.error(e.getMessage(), e);
        }
    }

    private void init() throws Exception {
        // 将本实例传递到handler
        BaseClientHandler clientHandler = SpringUtils.getBean(clientHandlerClass, this);
        Bootstrap b = new Bootstrap();
        // 2 通过辅助类去构造server/client
        b.group(workGroup)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
                .option(ChannelOption.SO_RCVBUF, 1024 * 32)
                .option(ChannelOption.SO_SNDBUF, 1024 * 32)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024 * 1024, Unpooled.copiedBuffer("
".getBytes())));
                        ch.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8)); // String解码。
                        ch.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8)); // String解码。
                        // 心跳设置
                        ch.pipeline().addLast(new IdleStateHandler(0, 0, 600, TimeUnit.SECONDS));
                        ch.pipeline().addLast(clientHandler);
                    }
                });
        this.connect(b);
    }

    private void connect(Bootstrap b) throws InterruptedException {
        long c1 = System.currentTimeMillis();
        final int maxRetries = 2; // 重连2次
        final AtomicInteger count = new AtomicInteger();
        final AtomicBoolean flag = new AtomicBoolean(false);
        try {
            this.channelFuture = b.connect(host, port).addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (!future.isSuccess()) {
                        if (count.incrementAndGet() > maxRetries) {
                            log.warn("imei={}重连超过{}次", imei, maxRetries);
                        } else {
                            log.info("imei={}重连第{}次", imei, count);
                            b.connect(host, port).addListener(this);
                        }
                    } else {
                        log.info("imei={}连接成功,连接IP:{}连接端口:{}", imei, host, port);
                        flag.set(true);
                    }
                }
            }).sync(); // 同步连接
        } catch (Exception e) {
            log.error(e.getMessage(), e);
        }
        log.info("设备imei={},channelId={}连接耗时={}ms", imei, channelFuture.channel().id(), System.currentTimeMillis() - c1);
        if (flag.get()) {
            channelFuture.channel().closeFuture().sync(); // 连接成功后将持续阻塞该线程
        }
    }
}

Connection retries are limited to two attempts; if all fail the client gives up. The EventLoopGroup is injected to share threads among many clients, reducing resource consumption.

4. Handler

The DemoClientHandler processes channel events.

package org.example.client.handler;

import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import lombok.extern.slf4j.Slf4j;
import org.example.client.NettyClient;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;

import java.util.Map;

/**
 * @author ReWind00
 * @date 2023/2/15 10:09
 */
@Slf4j
@Component
@Scope("prototype")
public class DemoClientHandler extends BaseClientHandler {

    private final String imei;
    private final Map<String, Object> bizData;
    private final NettyClient nettyClient;
    private int allIdleCounter = 0;
    private static final int MAX_IDLE_TIMES = 3;

    public DemoClientHandler(NettyClient nettyClient) {
        this.nettyClient = nettyClient;
        this.imei = nettyClient.getImei();
        this.bizData = nettyClient.getBizData();
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        log.info("客户端imei={},通道激活成功", this.imei);
        synchronized (this.nettyClient) { // 当通道激活后解锁队列线程,然后再发送消息
            this.nettyClient.notify();
        }
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        log.warn("客户端imei={},通道断开连接", this.imei);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        log.info("客户端imei={},收到消息:  {}", this.imei, msg);
        // 处理业务...
        if ("shutdown".equals(msg)) {
            this.nettyClient.close();
        }
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent e = (IdleStateEvent) evt;
            boolean flag = false;
            if (e.state() == IdleState.ALL_IDLE) {
                this.allIdleCounter++;
                log.info("客户端imei={}触发闲读或写第{}次", this.imei, this.allIdleCounter);
                if (this.allIdleCounter >= MAX_IDLE_TIMES) {
                    flag = true;
                }
            }
            if (flag) {
                log.warn("读写超时达到{}次,主动断开连接", MAX_IDLE_TIMES);
                ctx.channel().close();
            }
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.error("客户端imei={},连接异常{}", imei, cause.getMessage(), cause);
    }
}

On channel activation it notifies the waiting client thread; on read it logs the received message and can handle business logic, including a “shutdown” command that closes the client.

5. Client Cache

Since Netty channels cannot be serialized, they are stored in a local ConcurrentHashMap ( NettyClientHolder).

package org.example.client;

import java.util.concurrent.ConcurrentHashMap;
/**
 * @author ReWind00
 * @date 2023/2/15 11:01
 */
public class NettyClientHolder {
    private static final ConcurrentHashMap<String, NettyClient> clientMap = new ConcurrentHashMap<>();
    public static ConcurrentHashMap<String, NettyClient> get() {
        return clientMap;
    }
}

5. Testing

A Spring Boot controller provides three test endpoints: /demo/testOne – sends two messages with a 5 second interval. /demo/testTwo – sends an arbitrary message to a specified IMEI. /demo/testThree – sends two messages for the same IMEI, demonstrating re‑queueing due to the lock.

package org.example.client.controller;

import org.example.client.QueueHolder;
import org.example.client.model.NettyMsgModel;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
/**
 * @author ReWind00
 * @date 2023/2/15 13:48
 */
@RestController
@RequestMapping("/demo")
public class DemoController {

    /**
     * Interval send two messages
     */
    @GetMapping("testOne")
    public void testOne() {
        QueueHolder.get().offer(NettyMsgModel.create("87654321", "Hello World!"));
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        QueueHolder.get().offer(NettyMsgModel.create("87654321", "Hello World Too!"));
    }

    /**
     * Send arbitrary message
     */
    @GetMapping("testTwo")
    public void testTwo(@RequestParam String imei, @RequestParam String msg) {
        QueueHolder.get().offer(NettyMsgModel.create(imei, msg));
    }

    /**
     * Send two messages; the second will be re‑queued due to Redis lock
     */
    @GetMapping("testThree")
    public void testThree() {
        QueueHolder.get().offer(NettyMsgModel.create("12345678", "Hello World!"));
        QueueHolder.get().offer(NettyMsgModel.create("12345678", "Hello World Too!"));
    }
}

Log screenshots (omitted) show the first message triggering client creation and sending, the second using the existing channel, and the third test demonstrating delayed consumption after re‑queueing.

6. Source Code

https://gitee.com/jaster/netty-tcp-demo

Postscript

The demo is intended for learning and exchange only; applying it to a production environment may require additional improvements. Readers are welcome to leave comments for discussion.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

BackendjavaNettyTCPSpring BootMessage QueueSocket
Top Architect
Written by

Top Architect

Top Architect focuses on sharing practical architecture knowledge, covering enterprise, system, website, large‑scale distributed, and high‑availability architectures, plus architecture adjustments using internet technologies. We welcome idea‑driven, sharing‑oriented architects to exchange and learn together.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.