Netty TCP Client Demo with Local Queue, Redis Lock, and Spring Boot Integration
This article presents a complete Netty TCP client demo that uses a local BlockingQueue to simulate a message broker, applies Redis distributed locks to prevent duplicate connections, integrates Spring Boot 2.2.0, and provides detailed code, workflow, and testing instructions for developers.
Project Background: A recent IoT project required a long‑living socket connection for message communication. After encountering numerous bugs, the author refined the working code into an open‑source demo that removes business‑specific logic to facilitate learning.
Project Architecture: The demo is built with Netty for networking, Redis for distributed locking, and Spring Boot 2.2.0 for dependency management.
Project Modules: The repository contains three modules – netty-tcp-core (utility classes), netty-tcp-server (a test server, not used in production), and netty-tcp-client (the focus of this article).
Business Flow: In the original system RocketMQ was used as the message queue; the demo replaces it with a local BlockingQueue . The flow is Producer → Queue → Consumer (client) → TCP channel → Server → TCP channel → Client. The consumer checks whether a channel for the device already exists; if not, it creates one, otherwise it reuses the existing channel.
Message Queue Implementation: package org.example.client; import org.example.client.model.NettyMsgModel; import java.util.concurrent.ArrayBlockingQueue; /** * Demo uses a local queue; in production replace with RocketMQ or RabbitMQ */ public class QueueHolder { private static final ArrayBlockingQueue<NettyMsgModel> queue = new ArrayBlockingQueue<>(100); public static ArrayBlockingQueue<NettyMsgModel> get() { return queue; } }
Loop Thread to Consume Queue: public class LoopThread implements Runnable { @Override public void run() { for (int i = 0; i < MAIN_THREAD_POOL_SIZE; i++) { executor.execute(() -> { while (true) { try { NettyMsgModel nettyMsgModel = QueueHolder.get().take(); messageProcessor.process(nettyMsgModel); } catch (InterruptedException e) { log.error(e.getMessage(), e); } } }); } } }
Processing Logic with Redis Lock: public void process(NettyMsgModel nettyMsgModel) { String imei = nettyMsgModel.getImei(); try { synchronized (this) { if (redisCache.hasKey(NETTY_QUEUE_LOCK + imei)) { log.info("imei={} message processing, re‑queue", imei); new Timer().schedule(new TimerTask() { public void run() { QueueHolder.get().offer(nettyMsgModel); } }, 2000); return; } else { redisCache.setCacheObject(NETTY_QUEUE_LOCK + imei, "1", 120, TimeUnit.SECONDS); } } if (NettyClientHolder.get().containsKey(imei)) { NettyClient nettyClient = NettyClientHolder.get().get(imei); if (nettyClient.getChannelFuture() != null && nettyClient.getChannelFuture().channel().isActive()) { nettyClient.send(nettyMsgModel.getMsg()); } else { nettyClient.close(); this.createClientAndSend(nettyMsgModel); } } else { this.createClientAndSend(nettyMsgModel); } } catch (Exception e) { log.error(e.getMessage(), e); } finally { redisCache.deleteObject(NETTY_QUEUE_LOCK + imei); } }
Client Creation and Sending: private void createClientAndSend(NettyMsgModel nettyMsgModel) { NettyClient nettyClient = SpringUtils.getBean(NettyClient.class, nettyMsgModel.getImei(), nettyMsgModel.getBizData(), this.createDefaultWorkGroup(this.workerThread), DemoClientHandler.class); executor.execute(nettyClient); synchronized (nettyClient) { nettyClient.wait(5000); } if (nettyClient.getChannelFuture() != null && nettyClient.getChannelFuture().channel().isActive()) { NettyClientHolder.get().put(nettyMsgModel.getImei(), nettyClient); nettyClient.send(nettyMsgModel.getMsg()); } else { nettyClient.close(); } }
Netty Client Implementation (core parts): public class NettyClient implements Runnable { @Value("${netty.server.port}") private int port; @Value("${netty.server.host}") private String host; private String imei; private EventLoopGroup workGroup; private ChannelFuture channelFuture; public void run() { init(); log.info("Client started imei={}", imei); } private void init() throws Exception { Bootstrap b = new Bootstrap(); b.group(workGroup).channel(NioSocketChannel.class) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000) .handler(new ChannelInitializer () { protected void initChannel(SocketChannel ch) { ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024 * 1024, Unpooled.copiedBuffer("\r\n".getBytes()))); ch.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8)); ch.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8)); ch.pipeline().addLast(new IdleStateHandler(0, 0, 600, TimeUnit.SECONDS)); ch.pipeline().addLast(clientHandler); } }); connect(b); } private void connect(Bootstrap b) throws InterruptedException { final int maxRetries = 2; AtomicInteger count = new AtomicInteger(); channelFuture = b.connect(host, port).addListener(future -> { if (!future.isSuccess()) { if (count.incrementAndGet() > maxRetries) { log.warn("imei={} retries exceeded", imei); } else { b.connect(host, port).addListener(this); } } else { log.info("imei={} connected", imei); } }).sync(); if (channelFuture.channel().isActive()) { channelFuture.channel().closeFuture().sync(); } } }
DemoClientHandler (channel events): public class DemoClientHandler extends BaseClientHandler { private final String imei; private final NettyClient nettyClient; @Override public void channelActive(ChannelHandlerContext ctx) { log.info("Client imei={} channel active", imei); synchronized (nettyClient) { nettyClient.notify(); } } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { log.info("Client imei={} received: {}", imei, msg); if ("shutdown".equals(msg)) nettyClient.close(); } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) { if (evt instanceof IdleStateEvent) { /* handle idle */ } } }
Testing Endpoints: The demo provides three REST endpoints (testOne, testTwo, testThree) that enqueue messages, demonstrate delayed re‑queueing when a lock is held, and show normal message delivery. Sample logs illustrate client creation, message sending, and channel reuse.
Source Code: The full project is available at https://gitee.com/jaster/netty-tcp-demo .
Postscript: This demo is intended for learning and discussion only; it is not production‑ready and may require additional improvements before real‑world use.
Architect's Guide
Dedicated to sharing programmer-architect skills—Java backend, system, microservice, and distributed architectures—to help you become a senior architect.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.