How to Build a Scalable Netty TCP Client with Redis Queue and Spring Boot

This article walks through a complete Netty TCP client demo for IoT messaging, covering project architecture, module layout, business flow, detailed code explanations—including a local BlockingQueue replacement for RocketMQ, thread‑safe client creation with Redis locks, and testing endpoints—providing a practical reference for building robust socket‑based services.

Architect
Architect
Architect
How to Build a Scalable Netty TCP Client with Redis Queue and Spring Boot

Project Background

Recently a company IoT project required a long‑lived socket connection for message communication. After several buggy attempts, the author refined the working code into a demo project, stripping away business‑specific parts to share for learning.

Project Architecture

The demo uses Netty, Redis, and Spring Boot 2.2.0.

Project structure diagram
Project structure diagram

Project Modules

The directory structure is shown in the image above. netty-tcp-core is a common utilities module, netty-tcp-server is a test server (not used in production), and netty-tcp-client is the focus of this article.

Business Process

In the real project RocketMQ is used as the message queue; the demo replaces it with a local BlockingQueue. The data flow is:

生产者->消息队列->消费者(客户端)->tcp通道->服务端->tcp通道->客户端。

When the consumer receives a device message, it checks whether a connection to the server exists in the cache. If the channel is active, the message is sent directly; otherwise a new channel is created, and the message is sent after the channel becomes active. The client processes server responses accordingly.

Code Details

1. Message Queue

Because the demo removes the message middleware, a local queue is created to simulate the real scenario.

package org.example.client;
import org.example.client.model.NettyMsgModel;
import java.util.concurrent.ArrayBlockingQueue;
/**
 * Demo project uses a local queue; production should use a message middleware (RocketMQ or RabbitMQ)
 */
public class QueueHolder {
    private static final ArrayBlockingQueue<NettyMsgModel> queue = new ArrayBlockingQueue<>(100);
    public static ArrayBlockingQueue<NettyMsgModel> get() {
        return queue;
    }
}

A LoopThread starts a thread that continuously takes messages from the queue and processes them asynchronously.

public class LoopThread implements Runnable {
    @Override
    public void run() {
        for (int i = 0; i < MAIN_THREAD_POOL_SIZE; i++) {
            executor.execute(() -> {
                while (true) {
                    // take the head object from BlockingQueue, block if empty
                    try {
                        NettyMsgModel nettyMsgModel = QueueHolder.get().take();
                        messageProcessor.process(nettyMsgModel);
                    } catch (InterruptedException e) {
                        log.error(e.getMessage(), e);
                    }
                }
            });
        }
    }
}

The take method blocks the thread until a message arrives.

2. Execution Class

The process method belongs to MessageProcessor, a singleton that may be invoked by multiple threads.

public void process(NettyMsgModel nettyMsgModel) {
    String imei = nettyMsgModel.getImei();
    try {
        synchronized (this) { // avoid creating duplicate clients for the same device
            if (redisCache.hasKey(NETTY_QUEUE_LOCK + imei)) {
                log.info("imei={} message processing, re‑queue", imei);
                new Timer().schedule(new TimerTask() {
                    @Override
                    public void run() {
                        QueueHolder.get().offer(nettyMsgModel);
                    }
                }, 2000);
                log.info("imei={} re‑queue completed", imei);
                return;
            } else {
                redisCache.setCacheObject(NETTY_QUEUE_LOCK + imei, "1", 120, TimeUnit.SECONDS);
            }
        }
        if (NettyClientHolder.get().containsKey(imei)) {
            NettyClient nettyClient = NettyClientHolder.get().get(imei);
            if (nettyClient.getChannelFuture() != null && nettyClient.getChannelFuture().channel().isActive()) {
                if (!nettyClient.getChannelFuture().channel().isWritable()) {
                    log.warn("Channel not writable, imei={}, channelId={}", nettyClient.getImei(),
                             nettyClient.getChannelFuture().channel().id());
                }
                nettyClient.send(nettyMsgModel.getMsg());
            } else {
                log.info("client imei={}, channel inactive, closing", nettyClient.getImei());
                nettyClient.close();
                this.createClientAndSend(nettyMsgModel);
            }
        } else {
            this.createClientAndSend(nettyMsgModel);
        }
    } catch (Exception e) {
        log.error(e.getMessage(), e);
    } finally {
        redisCache.deleteObject(NETTY_QUEUE_LOCK + imei);
    }
}

The imei uniquely identifies a device; a Redis distributed lock prevents concurrent client creation for the same imei.

3. Client

The Netty client is a prototype‑scoped bean; each instance runs in its own thread and shares a common EventLoopGroup passed from outside to reduce resource consumption.

package org.example.client;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.CharsetUtil;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.example.client.handler.BaseClientHandler;
import org.example.core.util.SpringUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

@Slf4j
@Component
@Scope("prototype")
@Getter
@NoArgsConstructor
public class NettyClient implements Runnable {
    @Value("${netty.server.port}")
    private int port;
    @Value("${netty.server.host}")
    private String host;
    private String imei; // client unique identifier
    private Map<String, Object> bizData; // custom business data
    private EventLoopGroup workGroup;
    private Class<BaseClientHandler> clientHandlerClass;
    private ChannelFuture channelFuture;

    public NettyClient(String imei, Map<String, Object> bizData, EventLoopGroup workGroup,
                       Class<BaseClientHandler> clientHandlerClass) {
        this.imei = imei;
        this.bizData = bizData;
        this.workGroup = workGroup;
        this.clientHandlerClass = clientHandlerClass;
    }

    @Override
    public void run() {
        try {
            this.init();
            log.info("Client start imei={}", imei);
        } catch (Exception e) {
            log.error("Client start failed: {}", e.getMessage(), e);
        }
    }

    public void close() {
        if (this.channelFuture != null) {
            this.channelFuture.channel().close();
        }
        NettyClientHolder.get().remove(this.imei);
    }

    public void send(String message) {
        try {
            if (!this.channelFuture.channel().isActive()) {
                log.info("Channel inactive imei={}", this.imei);
                return;
            }
            if (!StringUtils.isEmpty(message)) {
                log.info("Queue message send => {}", message);
                this.channelFuture.channel().writeAndFlush(message);
            }
        } catch (Exception e) {
            log.error(e.getMessage(), e);
        }
    }

    private void init() throws Exception {
        BaseClientHandler clientHandler = SpringUtils.getBean(clientHandlerClass, this);
        Bootstrap b = new Bootstrap();
        b.group(workGroup)
         .channel(NioSocketChannel.class)
         .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
         .option(ChannelOption.SO_RCVBUF, 1024 * 32)
         .option(ChannelOption.SO_SNDBUF, 1024 * 32)
         .handler(new ChannelInitializer<SocketChannel>() {
             @Override
             protected void initChannel(SocketChannel ch) throws Exception {
                 ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024 * 1024,
                         Unpooled.copiedBuffer("
".getBytes())));
                 ch.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8));
                 ch.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8));
                 ch.pipeline().addLast(new IdleStateHandler(0, 0, 600, TimeUnit.SECONDS));
                 ch.pipeline().addLast(clientHandler);
             }
         });
        this.connect(b);
    }

    private void connect(Bootstrap b) throws InterruptedException {
        long start = System.currentTimeMillis();
        final int maxRetries = 2;
        final AtomicInteger count = new AtomicInteger();
        final AtomicBoolean success = new AtomicBoolean(false);
        try {
            this.channelFuture = b.connect(host, port).addListener(future -> {
                if (!future.isSuccess()) {
                    if (count.incrementAndGet() > maxRetries) {
                        log.warn("imei={} reconnect exceeded {} times", imei, maxRetries);
                    } else {
                        log.info("imei={} reconnect attempt {}", imei, count);
                        b.connect(host, port).addListener(this);
                    }
                } else {
                    log.info("imei={} connection succeeded, ip={}, port={}", imei, host, port);
                    success.set(true);
                }
            }).sync();
        } catch (Exception e) {
            log.error(e.getMessage(), e);
        }
        log.info("Device imei={}, channelId={}, connect time={}ms", imei,
                 channelFuture.channel().id(), System.currentTimeMillis() - start);
        if (success.get()) {
            channelFuture.channel().closeFuture().sync();
        }
    }

    private void createClientAndSend(NettyMsgModel nettyMsgModel) {
        log.info("Creating client imei={}", nettyMsgModel.getImei());
        NettyClient nettyClient = SpringUtils.getBean(NettyClient.class,
                nettyMsgModel.getImei(), nettyMsgModel.getBizData(),
                this.createDefaultWorkGroup(this.workerThread), DemoClientHandler.class);
        executor.execute(nettyClient);
        try {
            synchronized (nettyClient) {
                long t1 = System.currentTimeMillis();
                nettyClient.wait(5000);
                long t2 = System.currentTimeMillis();
                log.info("Client creation wait time={}ms", t2 - t1);
            }
            if (nettyClient.getChannelFuture() != null && nettyClient.getChannelFuture().channel().isActive()) {
                NettyClientHolder.get().put(nettyMsgModel.getImei(), nettyClient);
                nettyClient.send(nettyMsgModel.getMsg());
            } else {
                log.warn("Client creation failed imei={}", nettyMsgModel.getImei());
                nettyClient.close();
            }
        } catch (Exception e) {
            log.error("Client init send exception => {}", e.getMessage(), e);
        }
    }
}

The client uses a synchronized wait to block until the channel becomes active; the timeout is 5 seconds to avoid exhausting the thread pool.

Testing

A DemoController provides three test endpoints:

testOne : sends two messages with a 5‑second interval.

testTwo : sends a custom imei and message.

testThree : sends two messages for the same imei; the second is re‑queued due to the Redis lock.

package org.example.client.controller;
import org.example.client.QueueHolder;
import org.example.client.model.NettyMsgModel;
import org.springframework.web.bind.annotation.*;

@RestController
@RequestMapping("/demo")
public class DemoController {

    /** Send two messages with interval */
    @GetMapping("testOne")
    public void testOne() {
        QueueHolder.get().offer(NettyMsgModel.create("87654321", "Hello World!"));
        try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); }
        QueueHolder.get().offer(NettyMsgModel.create("87654321", "Hello World Too!"));
    }

    /** Send arbitrary message */
    @GetMapping("testTwo")
    public void testTwo(@RequestParam String imei, @RequestParam String msg) {
        QueueHolder.get().offer(NettyMsgModel.create(imei, msg));
    }

    /** Send two messages for same imei; second will be delayed by lock */
    @GetMapping("testThree")
    public void testThree() {
        QueueHolder.get().offer(NettyMsgModel.create("12345678", "Hello World!"));
        QueueHolder.get().offer(NettyMsgModel.create("12345678", "Hello World Too!"));
    }
}

Log screenshots (omitted) show that the first message triggers client creation, the second uses the existing channel, and in testThree the second message is re‑queued and consumed later.

Source Code

The full demo repository is available at https://gitee.com/jaster/netty-tcp-demo .

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

JavaredisNettyTCPSpring BootIoT
Architect
Written by

Architect

Professional architect sharing high‑quality architecture insights. Topics include high‑availability, high‑performance, high‑stability architectures, big data, machine learning, Java, system and distributed architecture, AI, and practical large‑scale architecture case studies. Open to ideas‑driven architects who enjoy sharing and learning.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.