Netty TCP Client Demo with Spring Boot: Architecture, Implementation, and Testing

This article presents a complete Netty TCP client demo built with Spring Boot, detailing the project architecture, module layout, business flow, key code implementations—including a local BlockingQueue, message processing with Redis locks, client initialization, handler logic, and testing endpoints—providing a practical reference for backend developers.

Top Architect
Top Architect
Top Architect
Netty TCP Client Demo with Spring Boot: Architecture, Implementation, and Testing

The author, a senior architect, shares a demo project for an IoT scenario that requires long‑living socket connections; the demo uses Netty, Redis, and Spring Boot 2.2.0 to illustrate a reliable message‑driven communication pipeline.

Project Architecture : The source is divided into three Maven modules – netty-tcp-core (utility classes), netty-tcp-server (a test server, not used in production), and netty-tcp-client (the focus of the article). A local ArrayBlockingQueue replaces a real message broker for demonstration purposes.

Business Flow : Producer → Message Queue → Consumer (client) → TCP channel → Server → TCP channel → Client. When a message arrives, the system checks whether a channel for the device (identified by IMEI) already exists and is active; if not, it creates a new Netty client, waits for the channel to become active, and then sends the message.

Key Code Snippets :

Queue holder:

package org.example.client;

import org.example.client.model.NettyMsgModel;
import java.util.concurrent.ArrayBlockingQueue;
/**
 * Demo uses a local queue; production should use a real MQ (RocketMQ/RabbitMQ).
 */
public class QueueHolder {
    private static final ArrayBlockingQueue<NettyMsgModel> queue = new ArrayBlockingQueue<>(100);
    public static ArrayBlockingQueue<NettyMsgModel> get() { return queue; }
}

Loop thread that continuously consumes messages:

public class LoopThread implements Runnable {
    @Override
    public void run() {
        for (int i = 0; i < MAIN_THREAD_POOL_SIZE; i++) {
            executor.execute(() -> {
                while (true) {
                    try {
                        NettyMsgModel nettyMsgModel = QueueHolder.get().take();
                        messageProcessor.process(nettyMsgModel);
                    } catch (InterruptedException e) {
                        log.error(e.getMessage(), e);
                    }
                }
            });
        }
    }
}

Message processing with Redis lock and client creation:

public void process(NettyMsgModel nettyMsgModel) {
    String imei = nettyMsgModel.getImei();
    try {
        synchronized (this) {
            if (redisCache.hasKey(NETTY_QUEUE_LOCK + imei)) {
                log.info("imei={} message processing, re‑queue", imei);
                new Timer().schedule(new TimerTask() {
                    @Override public void run() { QueueHolder.get().offer(nettyMsgModel); }
                }, 2000);
                return;
            } else {
                redisCache.setCacheObject(NETTY_QUEUE_LOCK + imei, "1", 120, TimeUnit.SECONDS);
            }
        }
        if (NettyClientHolder.get().containsKey(imei)) {
            NettyClient nettyClient = NettyClientHolder.get().get(imei);
            if (nettyClient.getChannelFuture() != null && nettyClient.getChannelFuture().channel().isActive()) {
                nettyClient.send(nettyMsgModel.getMsg());
            } else {
                createClientAndSend(nettyMsgModel);
            }
        } else {
            createClientAndSend(nettyMsgModel);
        }
    } catch (Exception e) { log.error(e.getMessage(), e); }
    finally { redisCache.deleteObject(NETTY_QUEUE_LOCK + imei); }
}

private void createClientAndSend(NettyMsgModel nettyMsgModel) {
    NettyClient nettyClient = SpringUtils.getBean(NettyClient.class, nettyMsgModel.getImei(), nettyMsgModel.getBizData(), createDefaultWorkGroup(this.workerThread), DemoClientHandler.class);
    executor.execute(nettyClient);
    synchronized (nettyClient) {
        nettyClient.wait(5000);
    }
    if (nettyClient.getChannelFuture() != null && nettyClient.getChannelFuture().channel().isActive()) {
        NettyClientHolder.get().put(nettyMsgModel.getImei(), nettyClient);
        nettyClient.send(nettyMsgModel.getMsg());
    } else {
        log.warn("client creation failed, imei={}", nettyMsgModel.getImei());
        nettyClient.close();
    }
}

Netty client implementation (bootstrap, reconnect, pipeline):

package org.example.client;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.CharsetUtil;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.example.client.handler.BaseClientHandler;
import org.example.core.util.SpringUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

@Slf4j
@Component
@Scope("prototype")
@Getter
@NoArgsConstructor
public class NettyClient implements Runnable {
    @Value("${netty.server.port}") private int port;
    @Value("${netty.server.host}") private String host;
    private String imei;
    private Map<String, Object> bizData;
    private EventLoopGroup workGroup;
    private Class<BaseClientHandler> clientHandlerClass;
    private ChannelFuture channelFuture;
    public NettyClient(String imei, Map<String, Object> bizData, EventLoopGroup workGroup, Class<BaseClientHandler> clientHandlerClass) {
        this.imei = imei; this.bizData = bizData; this.workGroup = workGroup; this.clientHandlerClass = clientHandlerClass;
    }
    @Override public void run() { try { init(); log.info("client started imei={}", imei); } catch (Exception e) { log.error("client start failed:{}", e.getMessage(), e); } }
    public void close() { if (channelFuture != null) channelFuture.channel().close(); NettyClientHolder.get().remove(this.imei); }
    public void send(String message) { try { if (!channelFuture.channel().isActive()) { log.info("channel inactive imei={}", imei); return; } if (!StringUtils.isEmpty(message)) { log.info("sending message=>{}", message); channelFuture.channel().writeAndFlush(message); } } catch (Exception e) { log.error(e.getMessage(), e); } }
    private void init() throws Exception {
        BaseClientHandler clientHandler = SpringUtils.getBean(clientHandlerClass, this);
        Bootstrap b = new Bootstrap();
        b.group(workGroup)
         .channel(NioSocketChannel.class)
         .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
         .option(ChannelOption.SO_RCVBUF, 1024 * 32)
         .option(ChannelOption.SO_SNDBUF, 1024 * 32)
         .handler(new ChannelInitializer<SocketChannel>() {
             @Override protected void initChannel(SocketChannel ch) throws Exception {
                 ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024 * 1024, Unpooled.copiedBuffer("
".getBytes())));
                 ch.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8));
                 ch.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8));
                 ch.pipeline().addLast(new IdleStateHandler(0, 0, 600, TimeUnit.SECONDS));
                 ch.pipeline().addLast(clientHandler);
             }
         });
        connect(b);
    }
    private void connect(Bootstrap b) throws InterruptedException {
        long start = System.currentTimeMillis();
        final int maxRetries = 2;
        AtomicInteger count = new AtomicInteger();
        AtomicBoolean success = new AtomicBoolean(false);
        channelFuture = b.connect(host, port).addListener((ChannelFutureListener) future -> {
            if (!future.isSuccess()) {
                if (count.incrementAndGet() > maxRetries) {
                    log.warn("imei={} exceeded {} retries", imei, maxRetries);
                } else {
                    log.info("imei={} reconnect attempt {}", imei, count);
                    b.connect(host, port).addListener(this);
                }
            } else {
                log.info("imei={} connected to {}:{}", imei, host, port);
                success.set(true);
            }
        }).sync();
        log.info("device imei={} channelId={} connect time={}ms", imei, channelFuture.channel().id(), System.currentTimeMillis() - start);
        if (success.get()) channelFuture.channel().closeFuture().sync();
    }
}

Demo client handler that processes channel events and messages:

package org.example.client.handler;

import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.timeout.IdleStateEvent;
import lombok.extern.slf4j.Slf4j;
import org.example.client.NettyClient;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import java.util.Map;

@Slf4j
@Component
@Scope("prototype")
public class DemoClientHandler extends BaseClientHandler {
    private final String imei;
    private final Map<String, Object> bizData;
    private final NettyClient nettyClient;
    private int allIdleCounter = 0;
    private static final int MAX_IDLE_TIMES = 3;
    public DemoClientHandler(NettyClient nettyClient) { this.nettyClient = nettyClient; this.imei = nettyClient.getImei(); this.bizData = nettyClient.getBizData(); }
    @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { log.info("client imei={} channel active", imei); synchronized (nettyClient) { nettyClient.notify(); } }
    @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { log.warn("client imei={} channel inactive", imei); }
    @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { log.info("client imei={} received: {}", imei, msg); if ("shutdown".equals(msg)) nettyClient.close(); }
    @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { allIdleCounter++; log.info("client imei={} idle count {}", imei, allIdleCounter); if (allIdleCounter >= MAX_IDLE_TIMES) ctx.channel().close(); } }
    @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { log.error("client imei={} exception {}", imei, cause.getMessage(), cause); }
}

Cache holder for client instances:

package org.example.client;

import java.util.concurrent.ConcurrentHashMap;
public class NettyClientHolder {
    private static final ConcurrentHashMap<String, NettyClient> clientMap = new ConcurrentHashMap<>();
    public static ConcurrentHashMap<String, NettyClient> get() { return clientMap; }
}

REST controller used for testing the demo (offers, testOne, testTwo, testThree):

package org.example.client.controller;

import org.example.client.QueueHolder;
import org.example.client.model.NettyMsgModel;
import org.springframework.web.bind.annotation.*;
@RestController
@RequestMapping("/demo")
public class DemoController {
    @GetMapping("testOne")
    public void testOne() {
        QueueHolder.get().offer(NettyMsgModel.create("87654321", "Hello World!"));
        Thread.sleep(5000);
        QueueHolder.get().offer(NettyMsgModel.create("87654321", "Hello World Too!"));
    }
    @GetMapping("testTwo")
    public void testTwo(@RequestParam String imei, @RequestParam String msg) { QueueHolder.get().offer(NettyMsgModel.create(imei, msg)); }
    @GetMapping("testThree")
    public void testThree() {
        QueueHolder.get().offer(NettyMsgModel.create("12345678", "Hello World!"));
        QueueHolder.get().offer(NettyMsgModel.create("12345678", "Hello World Too!"));
    }
}

Testing results show that the first message triggers client creation, subsequent messages reuse the active channel, and when a lock is present the second message is re‑queued for delayed consumption.

The complete source code is hosted at https://gitee.com/jaster/netty-tcp-demo .

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

javaNettyTCPSpring BootMessage Queue
Top Architect
Written by

Top Architect

Top Architect focuses on sharing practical architecture knowledge, covering enterprise, system, website, large‑scale distributed, and high‑availability architectures, plus architecture adjustments using internet technologies. We welcome idea‑driven, sharing‑oriented architects to exchange and learn together.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.