Backend Development 17 min read

Netty TCP Client Demo with Local Queue, Redis Lock, and Spring Boot Integration

This article presents a complete Netty TCP client demo that uses a local BlockingQueue to simulate a message broker, applies Redis distributed locks to prevent duplicate connections, integrates Spring Boot 2.2.0, and provides detailed code, workflow, and testing instructions for developers.

Architect's Guide
Architect's Guide
Architect's Guide
Netty TCP Client Demo with Local Queue, Redis Lock, and Spring Boot Integration

Project Background: A recent IoT project required a long‑living socket connection for message communication. After encountering numerous bugs, the author refined the working code into an open‑source demo that removes business‑specific logic to facilitate learning.

Project Architecture: The demo is built with Netty for networking, Redis for distributed locking, and Spring Boot 2.2.0 for dependency management.

Project Modules: The repository contains three modules – netty-tcp-core (utility classes), netty-tcp-server (a test server, not used in production), and netty-tcp-client (the focus of this article).

Business Flow: In the original system RocketMQ was used as the message queue; the demo replaces it with a local BlockingQueue . The flow is Producer → Queue → Consumer (client) → TCP channel → Server → TCP channel → Client. The consumer checks whether a channel for the device already exists; if not, it creates one, otherwise it reuses the existing channel.

Message Queue Implementation: package org.example.client; import org.example.client.model.NettyMsgModel; import java.util.concurrent.ArrayBlockingQueue; /** * Demo uses a local queue; in production replace with RocketMQ or RabbitMQ */ public class QueueHolder { private static final ArrayBlockingQueue<NettyMsgModel> queue = new ArrayBlockingQueue<>(100); public static ArrayBlockingQueue<NettyMsgModel> get() { return queue; } }

Loop Thread to Consume Queue: public class LoopThread implements Runnable { @Override public void run() { for (int i = 0; i < MAIN_THREAD_POOL_SIZE; i++) { executor.execute(() -> { while (true) { try { NettyMsgModel nettyMsgModel = QueueHolder.get().take(); messageProcessor.process(nettyMsgModel); } catch (InterruptedException e) { log.error(e.getMessage(), e); } } }); } } }

Processing Logic with Redis Lock: public void process(NettyMsgModel nettyMsgModel) { String imei = nettyMsgModel.getImei(); try { synchronized (this) { if (redisCache.hasKey(NETTY_QUEUE_LOCK + imei)) { log.info("imei={} message processing, re‑queue", imei); new Timer().schedule(new TimerTask() { public void run() { QueueHolder.get().offer(nettyMsgModel); } }, 2000); return; } else { redisCache.setCacheObject(NETTY_QUEUE_LOCK + imei, "1", 120, TimeUnit.SECONDS); } } if (NettyClientHolder.get().containsKey(imei)) { NettyClient nettyClient = NettyClientHolder.get().get(imei); if (nettyClient.getChannelFuture() != null && nettyClient.getChannelFuture().channel().isActive()) { nettyClient.send(nettyMsgModel.getMsg()); } else { nettyClient.close(); this.createClientAndSend(nettyMsgModel); } } else { this.createClientAndSend(nettyMsgModel); } } catch (Exception e) { log.error(e.getMessage(), e); } finally { redisCache.deleteObject(NETTY_QUEUE_LOCK + imei); } }

Client Creation and Sending: private void createClientAndSend(NettyMsgModel nettyMsgModel) { NettyClient nettyClient = SpringUtils.getBean(NettyClient.class, nettyMsgModel.getImei(), nettyMsgModel.getBizData(), this.createDefaultWorkGroup(this.workerThread), DemoClientHandler.class); executor.execute(nettyClient); synchronized (nettyClient) { nettyClient.wait(5000); } if (nettyClient.getChannelFuture() != null && nettyClient.getChannelFuture().channel().isActive()) { NettyClientHolder.get().put(nettyMsgModel.getImei(), nettyClient); nettyClient.send(nettyMsgModel.getMsg()); } else { nettyClient.close(); } }

Netty Client Implementation (core parts): public class NettyClient implements Runnable { @Value("${netty.server.port}") private int port; @Value("${netty.server.host}") private String host; private String imei; private EventLoopGroup workGroup; private ChannelFuture channelFuture; public void run() { init(); log.info("Client started imei={}", imei); } private void init() throws Exception { Bootstrap b = new Bootstrap(); b.group(workGroup).channel(NioSocketChannel.class) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000) .handler(new ChannelInitializer () { protected void initChannel(SocketChannel ch) { ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024 * 1024, Unpooled.copiedBuffer("\r\n".getBytes()))); ch.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8)); ch.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8)); ch.pipeline().addLast(new IdleStateHandler(0, 0, 600, TimeUnit.SECONDS)); ch.pipeline().addLast(clientHandler); } }); connect(b); } private void connect(Bootstrap b) throws InterruptedException { final int maxRetries = 2; AtomicInteger count = new AtomicInteger(); channelFuture = b.connect(host, port).addListener(future -> { if (!future.isSuccess()) { if (count.incrementAndGet() > maxRetries) { log.warn("imei={} retries exceeded", imei); } else { b.connect(host, port).addListener(this); } } else { log.info("imei={} connected", imei); } }).sync(); if (channelFuture.channel().isActive()) { channelFuture.channel().closeFuture().sync(); } } }

DemoClientHandler (channel events): public class DemoClientHandler extends BaseClientHandler { private final String imei; private final NettyClient nettyClient; @Override public void channelActive(ChannelHandlerContext ctx) { log.info("Client imei={} channel active", imei); synchronized (nettyClient) { nettyClient.notify(); } } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { log.info("Client imei={} received: {}", imei, msg); if ("shutdown".equals(msg)) nettyClient.close(); } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) { if (evt instanceof IdleStateEvent) { /* handle idle */ } } }

Testing Endpoints: The demo provides three REST endpoints (testOne, testTwo, testThree) that enqueue messages, demonstrate delayed re‑queueing when a lock is held, and show normal message delivery. Sample logs illustrate client creation, message sending, and channel reuse.

Source Code: The full project is available at https://gitee.com/jaster/netty-tcp-demo .

Postscript: This demo is intended for learning and discussion only; it is not production‑ready and may require additional improvements before real‑world use.

JavaConcurrencyRedisNettyTCPSpringBootSocket
Architect's Guide
Written by

Architect's Guide

Dedicated to sharing programmer-architect skills—Java backend, system, microservice, and distributed architectures—to help you become a senior architect.

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.