Netty TCP Client Demo: Architecture Overview, Code Walkthrough, and Testing

This article presents a complete Netty TCP client demo for an IoT project, explaining the project background, architecture, modules, business flow, and detailed Java code—including a local queue, thread pool, client initialization, handler logic, and test controllers—along with deployment notes and source links.

Top Architect
Top Architect
Top Architect
Netty TCP Client Demo: Architecture Overview, Code Walkthrough, and Testing

Project Background – An IoT project required a long‑living socket connection for message communication. After several bugs, a demo was created to share a clean version that removes business‑specific code and focuses on the core Netty‑based solution.

Architecture – The demo uses Netty, Redis, and Spring Boot 2.2.0. The project is divided into three modules: netty-tcp-core (utility classes), netty-tcp-server (test server, not used in production), and netty-tcp-client (the main focus).

Business Flow – Messages are produced, placed into a local BlockingQueue (simulating RocketMQ), and consumed by a pool of worker threads. Each message is processed by checking Redis for a lock, locating or creating a Netty client for the device IMEI, and sending the payload through an active TCP channel.

1. Message Queue

package org.example.client;

import org.example.client.model.NettyMsgModel;
import java.util.concurrent.ArrayBlockingQueue;
/**
 * Demo uses a local queue; in production replace with RocketMQ or RabbitMQ.
 */
public class QueueHolder {
    private static final ArrayBlockingQueue<NettyMsgModel> queue = new ArrayBlockingQueue<>(100);
    public static ArrayBlockingQueue<NettyMsgModel> get() { return queue; }
}

2. Loop Thread

public class LoopThread implements Runnable {
    @Override
    public void run() {
        for (int i = 0; i < MAIN_THREAD_POOL_SIZE; i++) {
            executor.execute(() -> {
                while (true) {
                    try {
                        NettyMsgModel nettyMsgModel = QueueHolder.get().take();
                        messageProcessor.process(nettyMsgModel);
                    } catch (InterruptedException e) {
                        log.error(e.getMessage(), e);
                    }
                }
            });
        }
    }
}

3. Execution Class

public void process(NettyMsgModel nettyMsgModel) {
    String imei = nettyMsgModel.getImei();
    try {
        synchronized (this) {
            if (redisCache.hasKey(NETTY_QUEUE_LOCK + imei)) {
                log.info("imei={} message processing, re‑queue", imei);
                new Timer().schedule(new TimerTask() {
                    @Override
                    public void run() { QueueHolder.get().offer(nettyMsgModel); }
                }, 2000);
                return;
            } else {
                redisCache.setCacheObject(NETTY_QUEUE_LOCK + imei, "1", 120, TimeUnit.SECONDS);
            }
        }
        if (NettyClientHolder.get().containsKey(imei)) {
            NettyClient nettyClient = NettyClientHolder.get().get(imei);
            if (nettyClient.getChannelFuture() != null && nettyClient.getChannelFuture().channel().isActive()) {
                nettyClient.send(nettyMsgModel.getMsg());
            } else {
                nettyClient.close();
                this.createClientAndSend(nettyMsgModel);
            }
        } else {
            this.createClientAndSend(nettyMsgModel);
        }
    } catch (Exception e) {
        log.error(e.getMessage(), e);
    } finally {
        redisCache.deleteObject(NETTY_QUEUE_LOCK + imei);
    }
}

4. Client Creation

private void createClientAndSend(NettyMsgModel nettyMsgModel) {
    log.info("Creating client for imei={}", nettyMsgModel.getImei());
    NettyClient nettyClient = SpringUtils.getBean(NettyClient.class, nettyMsgModel.getImei(), nettyMsgModel.getBizData(), this.createDefaultWorkGroup(this.workerThread), DemoClientHandler.class);
    executor.execute(nettyClient);
    try {
        synchronized (nettyClient) {
            long start = System.currentTimeMillis();
            nettyClient.wait(5000);
            long end = System.currentTimeMillis();
            log.info("Client wait time={}ms", end - start);
        }
        if (nettyClient.getChannelFuture() != null && nettyClient.getChannelFuture().channel().isActive()) {
            NettyClientHolder.get().put(nettyMsgModel.getImei(), nettyClient);
            nettyClient.send(nettyMsgModel.getMsg());
        } else {
            log.warn("Client creation failed imei={}", nettyMsgModel.getImei());
            nettyClient.close();
        }
    } catch (Exception e) {
        log.error("Client init error => {}", e.getMessage(), e);
    }
}

5. Netty Client

package org.example.client;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.CharsetUtil;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.example.client.handler.BaseClientHandler;
import org.example.core.util.SpringUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

@Slf4j
@Component
@Scope("prototype")
@Getter
@NoArgsConstructor
public class NettyClient implements Runnable {
    @Value("${netty.server.port}") private int port;
    @Value("${netty.server.host}") private String host;
    private String imei;
    private Map<String, Object> bizData;
    private EventLoopGroup workGroup;
    private Class<BaseClientHandler> clientHandlerClass;
    private ChannelFuture channelFuture;
    // constructor omitted for brevity
    @Override public void run() { try { init(); log.info("Client started imei={}", imei); } catch (Exception e) { log.error("Client start failed:{}", e.getMessage(), e); } }
    public void close() { if (channelFuture != null) channelFuture.channel().close(); NettyClientHolder.get().remove(imei); }
    public void send(String message) { try { if (!channelFuture.channel().isActive()) { log.info("Channel inactive imei={}", imei); return; } if (!StringUtils.isEmpty(message)) { channelFuture.channel().writeAndFlush(message); } } catch (Exception e) { log.error(e.getMessage(), e); } }
    private void init() throws Exception {
        BaseClientHandler clientHandler = SpringUtils.getBean(clientHandlerClass, this);
        Bootstrap b = new Bootstrap();
        b.group(workGroup)
         .channel(NioSocketChannel.class)
         .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
         .option(ChannelOption.SO_RCVBUF, 32 * 1024)
         .option(ChannelOption.SO_SNDBUF, 32 * 1024)
         .handler(new ChannelInitializer<SocketChannel>() {
             @Override protected void initChannel(SocketChannel ch) throws Exception {
                 ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024 * 1024, Unpooled.copiedBuffer("
".getBytes())));
                 ch.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8));
                 ch.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8));
                 ch.pipeline().addLast(new IdleStateHandler(0, 0, 600, TimeUnit.SECONDS));
                 ch.pipeline().addLast(clientHandler);
             }
         });
        connect(b);
    }
    private void connect(Bootstrap b) throws InterruptedException {
        long start = System.currentTimeMillis();
        final int maxRetries = 2;
        AtomicInteger count = new AtomicInteger();
        AtomicBoolean success = new AtomicBoolean(false);
        try {
            this.channelFuture = b.connect(host, port).addListener(future -> {
                if (!future.isSuccess()) {
                    if (count.incrementAndGet() > maxRetries) {
                        log.warn("imei={} exceeded {} retries", imei, maxRetries);
                    } else {
                        log.info("imei={} retry {}", imei, count);
                        b.connect(host, port).addListener(this);
                    }
                } else {
                    log.info("imei={} connected to {}:{}", imei, host, port);
                    success.set(true);
                }
            }).sync();
        } catch (Exception e) { log.error(e.getMessage(), e); }
        log.info("imei={} connection time={}ms", imei, System.currentTimeMillis() - start);
        if (success.get()) { channelFuture.channel().closeFuture().sync(); }
    }
}

6. Client Handler (DemoClientHandler)

package org.example.client.handler;

import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import lombok.extern.slf4j.Slf4j;
import org.example.client.NettyClient;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import java.util.Map;

@Slf4j
@Component
@Scope("prototype")
public class DemoClientHandler extends BaseClientHandler {
    private final String imei;
    private final Map<String, Object> bizData;
    private final NettyClient nettyClient;
    private int allIdleCounter = 0;
    private static final int MAX_IDLE_TIMES = 3;
    public DemoClientHandler(NettyClient nettyClient) {
        this.nettyClient = nettyClient;
        this.imei = nettyClient.getImei();
        this.bizData = nettyClient.getBizData();
    }
    @Override public void channelActive(ChannelHandlerContext ctx) throws Exception {
        log.info("Client imei={} channel active", imei);
        synchronized (nettyClient) { nettyClient.notify(); }
    }
    @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { log.warn("Client imei={} channel inactive", imei); }
    @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        log.info("Client imei={} received: {}", imei, msg);
        if ("shutdown".equals(msg)) { nettyClient.close(); }
    }
    @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent e = (IdleStateEvent) evt;
            if (e.state() == IdleState.ALL_IDLE) {
                allIdleCounter++;
                log.info("Client imei={} idle count {}", imei, allIdleCounter);
                if (allIdleCounter >= MAX_IDLE_TIMES) { ctx.channel().close(); }
            }
        }
    }
    @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { log.error("Client imei={} exception {}", imei, cause.getMessage(), cause); }
}

7. Client Holder

package org.example.client;

import java.util.concurrent.ConcurrentHashMap;
public class NettyClientHolder {
    private static final ConcurrentHashMap<String, NettyClient> clientMap = new ConcurrentHashMap<>();
    public static ConcurrentHashMap<String, NettyClient> get() { return clientMap; }
}

8. Test Controller

package org.example.client.controller;

import org.example.client.QueueHolder;
import org.example.client.model.NettyMsgModel;
import org.springframework.web.bind.annotation.*;

@RestController
@RequestMapping("/demo")
public class DemoController {
    @GetMapping("testOne")
    public void testOne() {
        QueueHolder.get().offer(NettyMsgModel.create("87654321", "Hello World!"));
        try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); }
        QueueHolder.get().offer(NettyMsgModel.create("87654321", "Hello World Too!"));
    }
    @GetMapping("testTwo")
    public void testTwo(@RequestParam String imei, @RequestParam String msg) {
        QueueHolder.get().offer(NettyMsgModel.create(imei, msg));
    }
    @GetMapping("testThree")
    public void testThree() {
        QueueHolder.get().offer(NettyMsgModel.create("12345678", "Hello World!"));
        QueueHolder.get().offer(NettyMsgModel.create("12345678", "Hello World Too!"));
    }
}

The test logs show that the first message triggers client creation, while the second message (after a 5‑second pause) is sent through the already‑active channel. When a lock is present, messages are re‑queued and consumed later, demonstrating the lock‑based deduplication logic.

All source code is available at https://gitee.com/jaster/netty-tcp-demo . The article concludes with a disclaimer that the demo is for learning only and may need further improvements before production use.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

javaredisNettyTCPSpring Boot
Top Architect
Written by

Top Architect

Top Architect focuses on sharing practical architecture knowledge, covering enterprise, system, website, large‑scale distributed, and high‑availability architectures, plus architecture adjustments using internet technologies. We welcome idea‑driven, sharing‑oriented architects to exchange and learn together.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.