How to Build a Scalable Netty TCP Client with Redis Queue and Spring Boot
This article walks through a complete Netty TCP client demo for IoT messaging, covering project architecture, module layout, business flow, detailed code explanations—including a local BlockingQueue replacement for RocketMQ, thread‑safe client creation with Redis locks, and testing endpoints—providing a practical reference for building robust socket‑based services.
Project Background
Recently a company IoT project required a long‑lived socket connection for message communication. After several buggy attempts, the author refined the working code into a demo project, stripping away business‑specific parts to share for learning.
Project Architecture
The demo uses Netty, Redis, and Spring Boot 2.2.0.
Project Modules
The directory structure is shown in the image above. netty-tcp-core is a common utilities module, netty-tcp-server is a test server (not used in production), and netty-tcp-client is the focus of this article.
Business Process
In the real project RocketMQ is used as the message queue; the demo replaces it with a local BlockingQueue. The data flow is:
生产者->消息队列->消费者(客户端)->tcp通道->服务端->tcp通道->客户端。When the consumer receives a device message, it checks whether a connection to the server exists in the cache. If the channel is active, the message is sent directly; otherwise a new channel is created, and the message is sent after the channel becomes active. The client processes server responses accordingly.
Code Details
1. Message Queue
Because the demo removes the message middleware, a local queue is created to simulate the real scenario.
package org.example.client;
import org.example.client.model.NettyMsgModel;
import java.util.concurrent.ArrayBlockingQueue;
/**
* Demo project uses a local queue; production should use a message middleware (RocketMQ or RabbitMQ)
*/
public class QueueHolder {
private static final ArrayBlockingQueue<NettyMsgModel> queue = new ArrayBlockingQueue<>(100);
public static ArrayBlockingQueue<NettyMsgModel> get() {
return queue;
}
}A LoopThread starts a thread that continuously takes messages from the queue and processes them asynchronously.
public class LoopThread implements Runnable {
@Override
public void run() {
for (int i = 0; i < MAIN_THREAD_POOL_SIZE; i++) {
executor.execute(() -> {
while (true) {
// take the head object from BlockingQueue, block if empty
try {
NettyMsgModel nettyMsgModel = QueueHolder.get().take();
messageProcessor.process(nettyMsgModel);
} catch (InterruptedException e) {
log.error(e.getMessage(), e);
}
}
});
}
}
}The take method blocks the thread until a message arrives.
2. Execution Class
The process method belongs to MessageProcessor, a singleton that may be invoked by multiple threads.
public void process(NettyMsgModel nettyMsgModel) {
String imei = nettyMsgModel.getImei();
try {
synchronized (this) { // avoid creating duplicate clients for the same device
if (redisCache.hasKey(NETTY_QUEUE_LOCK + imei)) {
log.info("imei={} message processing, re‑queue", imei);
new Timer().schedule(new TimerTask() {
@Override
public void run() {
QueueHolder.get().offer(nettyMsgModel);
}
}, 2000);
log.info("imei={} re‑queue completed", imei);
return;
} else {
redisCache.setCacheObject(NETTY_QUEUE_LOCK + imei, "1", 120, TimeUnit.SECONDS);
}
}
if (NettyClientHolder.get().containsKey(imei)) {
NettyClient nettyClient = NettyClientHolder.get().get(imei);
if (nettyClient.getChannelFuture() != null && nettyClient.getChannelFuture().channel().isActive()) {
if (!nettyClient.getChannelFuture().channel().isWritable()) {
log.warn("Channel not writable, imei={}, channelId={}", nettyClient.getImei(),
nettyClient.getChannelFuture().channel().id());
}
nettyClient.send(nettyMsgModel.getMsg());
} else {
log.info("client imei={}, channel inactive, closing", nettyClient.getImei());
nettyClient.close();
this.createClientAndSend(nettyMsgModel);
}
} else {
this.createClientAndSend(nettyMsgModel);
}
} catch (Exception e) {
log.error(e.getMessage(), e);
} finally {
redisCache.deleteObject(NETTY_QUEUE_LOCK + imei);
}
}The imei uniquely identifies a device; a Redis distributed lock prevents concurrent client creation for the same imei.
3. Client
The Netty client is a prototype‑scoped bean; each instance runs in its own thread and shares a common EventLoopGroup passed from outside to reduce resource consumption.
package org.example.client;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.CharsetUtil;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.example.client.handler.BaseClientHandler;
import org.example.core.util.SpringUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
@Component
@Scope("prototype")
@Getter
@NoArgsConstructor
public class NettyClient implements Runnable {
@Value("${netty.server.port}")
private int port;
@Value("${netty.server.host}")
private String host;
private String imei; // client unique identifier
private Map<String, Object> bizData; // custom business data
private EventLoopGroup workGroup;
private Class<BaseClientHandler> clientHandlerClass;
private ChannelFuture channelFuture;
public NettyClient(String imei, Map<String, Object> bizData, EventLoopGroup workGroup,
Class<BaseClientHandler> clientHandlerClass) {
this.imei = imei;
this.bizData = bizData;
this.workGroup = workGroup;
this.clientHandlerClass = clientHandlerClass;
}
@Override
public void run() {
try {
this.init();
log.info("Client start imei={}", imei);
} catch (Exception e) {
log.error("Client start failed: {}", e.getMessage(), e);
}
}
public void close() {
if (this.channelFuture != null) {
this.channelFuture.channel().close();
}
NettyClientHolder.get().remove(this.imei);
}
public void send(String message) {
try {
if (!this.channelFuture.channel().isActive()) {
log.info("Channel inactive imei={}", this.imei);
return;
}
if (!StringUtils.isEmpty(message)) {
log.info("Queue message send => {}", message);
this.channelFuture.channel().writeAndFlush(message);
}
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
private void init() throws Exception {
BaseClientHandler clientHandler = SpringUtils.getBean(clientHandlerClass, this);
Bootstrap b = new Bootstrap();
b.group(workGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
.option(ChannelOption.SO_RCVBUF, 1024 * 32)
.option(ChannelOption.SO_SNDBUF, 1024 * 32)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024 * 1024,
Unpooled.copiedBuffer("
".getBytes())));
ch.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8));
ch.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8));
ch.pipeline().addLast(new IdleStateHandler(0, 0, 600, TimeUnit.SECONDS));
ch.pipeline().addLast(clientHandler);
}
});
this.connect(b);
}
private void connect(Bootstrap b) throws InterruptedException {
long start = System.currentTimeMillis();
final int maxRetries = 2;
final AtomicInteger count = new AtomicInteger();
final AtomicBoolean success = new AtomicBoolean(false);
try {
this.channelFuture = b.connect(host, port).addListener(future -> {
if (!future.isSuccess()) {
if (count.incrementAndGet() > maxRetries) {
log.warn("imei={} reconnect exceeded {} times", imei, maxRetries);
} else {
log.info("imei={} reconnect attempt {}", imei, count);
b.connect(host, port).addListener(this);
}
} else {
log.info("imei={} connection succeeded, ip={}, port={}", imei, host, port);
success.set(true);
}
}).sync();
} catch (Exception e) {
log.error(e.getMessage(), e);
}
log.info("Device imei={}, channelId={}, connect time={}ms", imei,
channelFuture.channel().id(), System.currentTimeMillis() - start);
if (success.get()) {
channelFuture.channel().closeFuture().sync();
}
}
private void createClientAndSend(NettyMsgModel nettyMsgModel) {
log.info("Creating client imei={}", nettyMsgModel.getImei());
NettyClient nettyClient = SpringUtils.getBean(NettyClient.class,
nettyMsgModel.getImei(), nettyMsgModel.getBizData(),
this.createDefaultWorkGroup(this.workerThread), DemoClientHandler.class);
executor.execute(nettyClient);
try {
synchronized (nettyClient) {
long t1 = System.currentTimeMillis();
nettyClient.wait(5000);
long t2 = System.currentTimeMillis();
log.info("Client creation wait time={}ms", t2 - t1);
}
if (nettyClient.getChannelFuture() != null && nettyClient.getChannelFuture().channel().isActive()) {
NettyClientHolder.get().put(nettyMsgModel.getImei(), nettyClient);
nettyClient.send(nettyMsgModel.getMsg());
} else {
log.warn("Client creation failed imei={}", nettyMsgModel.getImei());
nettyClient.close();
}
} catch (Exception e) {
log.error("Client init send exception => {}", e.getMessage(), e);
}
}
}The client uses a synchronized wait to block until the channel becomes active; the timeout is 5 seconds to avoid exhausting the thread pool.
Testing
A DemoController provides three test endpoints:
testOne : sends two messages with a 5‑second interval.
testTwo : sends a custom imei and message.
testThree : sends two messages for the same imei; the second is re‑queued due to the Redis lock.
package org.example.client.controller;
import org.example.client.QueueHolder;
import org.example.client.model.NettyMsgModel;
import org.springframework.web.bind.annotation.*;
@RestController
@RequestMapping("/demo")
public class DemoController {
/** Send two messages with interval */
@GetMapping("testOne")
public void testOne() {
QueueHolder.get().offer(NettyMsgModel.create("87654321", "Hello World!"));
try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); }
QueueHolder.get().offer(NettyMsgModel.create("87654321", "Hello World Too!"));
}
/** Send arbitrary message */
@GetMapping("testTwo")
public void testTwo(@RequestParam String imei, @RequestParam String msg) {
QueueHolder.get().offer(NettyMsgModel.create(imei, msg));
}
/** Send two messages for same imei; second will be delayed by lock */
@GetMapping("testThree")
public void testThree() {
QueueHolder.get().offer(NettyMsgModel.create("12345678", "Hello World!"));
QueueHolder.get().offer(NettyMsgModel.create("12345678", "Hello World Too!"));
}
}Log screenshots (omitted) show that the first message triggers client creation, the second uses the existing channel, and in testThree the second message is re‑queued and consumed later.
Source Code
The full demo repository is available at https://gitee.com/jaster/netty-tcp-demo .
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Architect
Professional architect sharing high‑quality architecture insights. Topics include high‑availability, high‑performance, high‑stability architectures, big data, machine learning, Java, system and distributed architecture, AI, and practical large‑scale architecture case studies. Open to ideas‑driven architects who enjoy sharing and learning.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
