Netty TCP Demo: Architecture, Code Walkthrough, and Usage Guide

This article presents a comprehensive Netty TCP demo project, covering its architecture, module layout, business flow, detailed Java code for message queuing, client execution, handler implementation, testing endpoints, and deployment considerations, providing a practical learning resource for backend developers.

Selected Java Interview Questions
Selected Java Interview Questions
Selected Java Interview Questions
Netty TCP Demo: Architecture, Code Walkthrough, and Usage Guide

Project Background

The author needed a long‑lived socket connection for an IoT project, encountered many bugs, and decided to share a cleaned‑up demo that removes the messy business logic and focuses on the core Netty client implementation.

Main Content

1. Project Architecture

The project uses Netty, Redis, and Spring Boot 2.2.0.

2. Project Modules

The directory structure is shown in an image (omitted). The key modules are: netty-tcp-core – common utilities; netty-tcp-server – test server (not used in production); netty-tcp-client – the client, which is the focus of this article.

3. Business Flow

In the real project RocketMQ is used, but the demo replaces it with a BlockingQueue. The data flow is: Producer → Message Queue → Consumer (client) → TCP channel → Server → TCP channel → Client.

When the consumer receives a device message, it checks the cache for an existing connection. If the connection exists and the channel is active, the message is sent directly; otherwise a new channel is created and the message is sent after activation.

4. Code Details

1. Message Queue

Because the demo removes the middleware, a local queue is created to simulate the real scenario.

package org.example.client;

import org.example.client.model.NettyMsgModel;
import java.util.concurrent.ArrayBlockingQueue;

/**
 * This project is for demonstration; in production a message middleware such as RocketMQ or RabbitMQ should be used.
 *
 * @author ReWind00
 * @date 2023/2/15 11:20
 */
public class QueueHolder {
    private static final ArrayBlockingQueue<NettyMsgModel> queue = new ArrayBlockingQueue<>(100);

    public static ArrayBlockingQueue<NettyMsgModel> get() {
        return queue;
    }
}

A thread continuously takes messages from the queue and processes them asynchronously.

public class LoopThread implements Runnable {
    @Override
    public void run() {
        for (int i = 0; i < MAIN_THREAD_POOL_SIZE; i++) {
            executor.execute(() -> {
                while (true) {
                    // Take the head object from the BlockingQueue; block if empty.
                    try {
                        NettyMsgModel nettyMsgModel = QueueHolder.get().take();
                        messageProcessor.process(nettyMsgModel);
                    } catch (InterruptedException e) {
                        log.error(e.getMessage(), e);
                    }
                }
            });
        }
    }
}

2. Execution Class

The process method belongs to the MessageProcessor class, which is a singleton but may be invoked by multiple threads.

public void process(NettyMsgModel nettyMsgModel) {
    String imei = nettyMsgModel.getImei();
    try {
        synchronized (this) { // Prevent duplicate client creation for the same device.
            if (redisCache.hasKey(NETTY_QUEUE_LOCK + imei)) {
                log.info("imei={} message processing, re‑queueing", imei);
                // Re‑queue with a delay (in production use RocketMQ or RabbitMQ for delayed consumption).
                new Timer().schedule(new TimerTask() {
                    @Override
                    public void run() {
                        QueueHolder.get().offer(nettyMsgModel);
                    }
                }, 2000);
                log.info("imei={} message processing, re‑queue completed", imei);
                return;
            } else {
                // Acquire lock if no connection exists.
                redisCache.setCacheObject(NETTY_QUEUE_LOCK + imei, "1", 120, TimeUnit.SECONDS);
            }
        }
        // If the cache contains a client, send the message; otherwise create a new client.
        if (NettyClientHolder.get().containsKey(imei)) {
            NettyClient nettyClient = NettyClientHolder.get().get(imei);
            if (nettyClient.getChannelFuture() != null && nettyClient.getChannelFuture().channel().isActive()) {
                if (!nettyClient.getChannelFuture().channel().isWritable()) {
                    log.warn("Warning, channel not writable, imei={}, channelId={}", nettyClient.getImei(), nettyClient.getChannelFuture().channel().id());
                }
                nettyClient.send(nettyMsgModel.getMsg());
            } else {
                log.info("client imei={}, channel inactive, closing", nettyClient.getImei());
                nettyClient.close();
                this.createClientAndSend(nettyMsgModel);
            }
        } else {
            this.createClientAndSend(nettyMsgModel);
        }
    } catch (Exception e) {
        log.error(e.getMessage(), e);
    } finally {
        // Release lock after processing.
        redisCache.deleteObject(NETTY_QUEUE_LOCK + imei);
    }
}

The method uses the device IMEI as a cache key, applies a synchronized block and a Redis distributed lock to avoid duplicate client creation, and re‑queues messages with a delay when the lock is held.

After acquiring the lock, the thread creates a client if none exists, otherwise it sends the message directly.

private void createClientAndSend(NettyMsgModel nettyMsgModel) {
    log.info("Creating client, imei={}", nettyMsgModel.getImei());
    // DemoClientHandler can be customized per business.
    NettyClient nettyClient = SpringUtils.getBean(NettyClient.class, nettyMsgModel.getImei(), nettyMsgModel.getBizData(),
            this.createDefaultWorkGroup(this.workerThread), DemoClientHandler.class);
    executor.execute(nettyClient); // Execute client initialization.
    try {
        // Wait for client activation with a lock.
        synchronized (nettyClient) {
            long c1 = System.currentTimeMillis();
            nettyClient.wait(5000); // Auto‑unlock after 5 seconds if activation fails.
            long c2 = System.currentTimeMillis();
            log.info("Client creation wait time={}ms", c2 - c1);
        }
        if (nettyClient.getChannelFuture() != null && nettyClient.getChannelFuture().channel().isActive()) {
            // Store in cache.
            NettyClientHolder.get().put(nettyMsgModel.getImei(), nettyClient);
            // Send message after activation.
            nettyClient.send(nettyMsgModel.getMsg());
        } else {
            log.warn("Client creation failed, imei={}", nettyMsgModel.getImei());
            nettyClient.close();
            // Optionally re‑queue the message.
        }
    } catch (Exception e) {
        log.error("Client initialization exception => {}", e.getMessage(), e);
    }
}

3. Client

package org.example.client;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.CharsetUtil;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.example.client.handler.BaseClientHandler;
import org.example.core.util.SpringUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @author ReWind00
 * @date 2023/2/15 9:59
 */
@Slf4j
@Component
@Scope("prototype")
@Getter
@NoArgsConstructor
public class NettyClient implements Runnable {

    @Value("${netty.server.port}")
    private int port;

    @Value("${netty.server.host}")
    private String host;
    // Unique device identifier
    private String imei;
    // Custom business data
    private Map<String, Object> bizData;

    private EventLoopGroup workGroup;

    private Class<BaseClientHandler> clientHandlerClass;

    private ChannelFuture channelFuture;

    public NettyClient(String imei, Map<String, Object> bizData, EventLoopGroup workGroup, Class<BaseClientHandler> clientHandlerClass) {
        this.imei = imei;
        this.bizData = bizData;
        this.workGroup = workGroup;
        this.clientHandlerClass = clientHandlerClass;
    }

    @Override
    public void run() {
        try {
            this.init();
            log.info("Client started imei={}", imei);
        } catch (Exception e) {
            log.error("Client start failed:{}", e.getMessage(), e);
        }
    }

    public void close() {
        if (this.channelFuture != null) {
            this.channelFuture.channel().close();
        }
        NettyClientHolder.get().remove(this.imei);
    }

    public void send(String message) {
        try {
            if (!this.channelFuture.channel().isActive()) {
                log.info("Channel inactive imei={}", this.imei);
                return;
            }
            if (!StringUtils.isEmpty(message)) {
                log.info("Queue message sending => {}", message);
                this.channelFuture.channel().writeAndFlush(message);
            }
        } catch (Exception e) {
            log.error(e.getMessage(), e);
        }
    }

    private void init() throws Exception {
        // Pass this instance to the handler.
        BaseClientHandler clientHandler = SpringUtils.getBean(clientHandlerClass, this);
        Bootstrap b = new Bootstrap();
        // Build client.
        b.group(workGroup)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
                .option(ChannelOption.SO_RCVBUF, 1024 * 32)
                .option(ChannelOption.SO_SNDBUF, 1024 * 32)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024 * 1024, Unpooled.copiedBuffer("
".getBytes())));
                        ch.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8));
                        ch.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8));
                        ch.pipeline().addLast(new IdleStateHandler(0, 0, 600, TimeUnit.SECONDS));
                        ch.pipeline().addLast(clientHandler);
                    }
                });
        this.connect(b);
    }

    private void connect(Bootstrap b) throws InterruptedException {
        long c1 = System.currentTimeMillis();
        final int maxRetries = 2; // Retry twice.
        final AtomicInteger count = new AtomicInteger();
        final AtomicBoolean flag = new AtomicBoolean(false);
        try {
            this.channelFuture = b.connect(host, port).addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (!future.isSuccess()) {
                        if (count.incrementAndGet() > maxRetries) {
                            log.warn("imei={} reconnection exceeded {} times", imei, maxRetries);
                        } else {
                            log.info("imei={} reconnection attempt {}", imei, count);
                            b.connect(host, port).addListener(this);
                        }
                    } else {
                        log.info("imei={} connected successfully, IP:{} port:{}", imei, host, port);
                        flag.set(true);
                    }
                }
            }).sync(); // Synchronous connection.
        } catch (Exception e) {
            log.error(e.getMessage(), e);
        }
        log.info("Device imei={} channelId={} connection time={}ms", imei, channelFuture.channel().id(), System.currentTimeMillis() - c1);
        if (flag.get()) {
            channelFuture.channel().closeFuture().sync(); // Block until channel closes.
        }
    }
}

The client runs as a prototype bean; each instance blocks until its channel is closed. It supports two reconnection attempts and shares a common EventLoopGroup to avoid excessive thread creation.

4. Client Handler

package org.example.client.handler;

import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import lombok.extern.slf4j.Slf4j;
import org.example.client.NettyClient;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;

import java.util.Map;

/**
 * @author ReWind00
 * @date 2023/2/15 10:09
 */
@Slf4j
@Component
@Scope("prototype")
public class DemoClientHandler extends BaseClientHandler {

    private final String imei;
    private final Map<String, Object> bizData;
    private final NettyClient nettyClient;
    private int allIdleCounter = 0;
    private static final int MAX_IDLE_TIMES = 3;

    public DemoClientHandler(NettyClient nettyClient) {
        this.nettyClient = nettyClient;
        this.imei = nettyClient.getImei();
        this.bizData = nettyClient.getBizData();
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        log.info("Client imei={} channel activated", this.imei);
        synchronized (this.nettyClient) { // Unlock the waiting thread after activation.
            this.nettyClient.notify();
        }
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        log.warn("Client imei={} channel disconnected", this.imei);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        log.info("Client imei={} received message: {}", this.imei, msg);
        // Business processing ...
        if ("shutdown".equals(msg)) {
            this.nettyClient.close();
        }
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent e = (IdleStateEvent) evt;
            boolean close = false;
            if (e.state() == IdleState.ALL_IDLE) {
                this.allIdleCounter++;
                log.info("Client imei={} idle count {}", this.imei, this.allIdleCounter);
                if (this.allIdleCounter >= MAX_IDLE_TIMES) {
                    close = true;
                }
            }
            if (close) {
                log.warn("Read/write idle {} times, closing connection", MAX_IDLE_TIMES);
                ctx.channel().close();
            }
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.error("Client imei={} connection exception {}", imei, cause.getMessage(), cause);
    }
}

The handler notifies the waiting client thread when the channel becomes active, processes incoming messages, and closes the connection on shutdown or after a configurable number of idle events.

5. Client Cache

package org.example.client;

import java.util.concurrent.ConcurrentHashMap;

/**
 * @author ReWind00
 * @date 2023/2/15 11:01
 */
public class NettyClientHolder {
    private static final ConcurrentHashMap<String, NettyClient> clientMap = new ConcurrentHashMap<>();

    public static ConcurrentHashMap<String, NettyClient> get() {
        return clientMap;
    }
}

Because Netty channels cannot be serialized, the cache is kept in a local ConcurrentHashMap rather than Redis.

5. Testing

A Spring Boot controller provides three test endpoints: testOne – sends two messages with a 5‑second interval; the first triggers client creation, the second uses the existing channel. testTwo – sends an arbitrary message to a specified IMEI. testThree – sends two messages for the same device; the second is re‑queued due to the Redis lock and processed later.

Log screenshots (omitted) demonstrate client creation, message sending, lock‑based re‑queueing, and graceful shutdown via a "shutdown" message.

6. Source Code

Repository: https://gitee.com/jaster/netty-tcp-demo

Afterword

The demo is intended for learning and exchange only; production use requires additional improvements and considerations.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

javaredisNettyTCPSpring BootSocket
Selected Java Interview Questions
Written by

Selected Java Interview Questions

A professional Java tech channel sharing common knowledge to help developers fill gaps. Follow us!

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.