Implementing a Netty WebSocket Client in Java
This article explains how to replace thread‑heavy WebSocket or Socket.IO clients with a Netty‑based Java WebSocket client, covering Maven dependencies, connection setup, event‑loop management, message sending, and a custom channel handler for high‑concurrency scenarios.
In earlier Socket tutorials the focus was on two client types— WebSocket and Socket.IO —which required a separate thread for each connection, consuming considerable CPU and memory resources when handling thousands of sockets.
To reduce thread usage, the author switched to a Go implementation using packages such as /net/websocket and gorilla/websocket . A comparable Java solution is provided using the Netty framework.
Dependency
<!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
io.netty
netty-all
4.1.85.FinalNetty WebSocket client
The client creates a connection and uses an event‑loop thread pool ( io.netty.channel.EventLoopGroup ) together with a io.netty.bootstrap.Bootstrap to manage channel creation, message sending, and receiving.
package com.funtester.socket.netty
import com.funtester.frame.execute.ThreadPoolUtil
import groovy.util.logging.Log4j2
import io.netty.bootstrap.Bootstrap
import io.netty.channel.*
import io.netty.channel.group.ChannelGroup
import io.netty.channel.group.DefaultChannelGroup
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.SocketChannel
import io.netty.channel.socket.nio.NioSocketChannel
import io.netty.handler.codec.http.DefaultHttpHeaders
import io.netty.handler.codec.http.HttpClientCodec
import io.netty.handler.codec.http.HttpObjectAggregator
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory
import io.netty.handler.codec.http.websocketx.WebSocketVersion
import io.netty.handler.stream.ChunkedWriteHandler
import io.netty.util.concurrent.GlobalEventExecutor
@Log4j2
class WebSocketConnector {
static Bootstrap bootstrap = new Bootstrap()
static EventLoopGroup group = new NioEventLoopGroup(ThreadPoolUtil.getFactory("N"))
static {
bootstrap.group(group).channel(NioSocketChannel.class)
}
static ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE)
WebSocketClientHandshaker handShaker
ChannelPromise handshakeFuture
String host
int port
Channel channel
WebSocketIoHandler handler
WebSocketConnector(String host, int port) {
this.host = host
this.port = port
String URL = this.host + ":" + this.port + "/test"
URI uri = new URI(URL)
handler = new WebSocketIoHandler(WebSocketClientHandshakerFactory.newHandshaker(uri, WebSocketVersion.V13, null, true, new DefaultHttpHeaders()))
bootstrap.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_TIMEOUT, true)
.option(ChannelOption.SO_BROADCAST, true)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new ChannelInitializer
() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline()
pipeline.addLast(new HttpClientCodec())
pipeline.addLast(new ChunkedWriteHandler())
pipeline.addLast(new HttpObjectAggregator(1024 * 1024))
pipeline.addLast(handler)
}
})
}
void connect() {
try {
try {
ChannelFuture future = bootstrap.connect(this.host + "ws://" + "wss://", this.port).sync()
this.channel = future.channel()
clients.add(channel)
} catch (e) {
log.error("创建channel失败", e)
}
} catch (Exception e) {
log.error("连接服务失败", e)
} finally {
this.handshakeFuture = handler.handshakeFuture()
}
}
void close() {
this.channel.close()
}
}The ChannelGroup ( io.netty.channel.group.ChannelGroup ) automatically tracks all active channels and removes closed ones, simplifying resource management.
Two helper methods are added for sending messages:
/**
* Send a text message
*/
void sendText(String msg) {
channel.writeAndFlush(new TextWebSocketFrame(msg))
}
/**
* Send a ping message
*/
void ping() {
channel.writeAndFlush(new PingWebSocketFrame())
}Message handler
The handler extends io.netty.channel.SimpleChannelInboundHandler to process different WebSocketFrame types, such as text, close, and idle events, and to manage the handshake lifecycle.
package com.funtester.socket.netty
import groovy.util.logging.Log4j2
import io.netty.channel.*
import io.netty.channel.group.ChannelGroup
import io.netty.channel.group.DefaultChannelGroup
import io.netty.handler.codec.http.FullHttpResponse
import io.netty.handler.codec.http.websocketx.*
import io.netty.handler.timeout.IdleState
import io.netty.handler.timeout.IdleStateEvent
import io.netty.util.concurrent.GlobalEventExecutor
/**
* WebSocket client IO handler
*/
@Log4j2
class WebSocketIoHandler extends SimpleChannelInboundHandler
{
private ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE)
private final WebSocketClientHandshaker handShaker
private ChannelPromise handshakeFuture
WebSocketIoHandler(WebSocketClientHandshaker handShaker) {
this.handShaker = handShaker
}
ChannelFuture handshakeFuture() { return handshakeFuture }
@Override
void handlerAdded(ChannelHandlerContext ctx) {
handshakeFuture = ctx.newPromise()
}
@Override
void channelActive(ChannelHandlerContext ctx) {
handShaker.handshake(ctx.channel())
}
@Override
void channelInactive(ChannelHandlerContext ctx) {
ctx.close()
try { super.channelInactive(ctx) } catch (Exception e) { log.error("channelInactive 异常.", e) }
log.warn("WebSocket链路与服务器连接已断开.")
}
@Override
void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
Channel ch = ctx.channel()
if (!handShaker.isHandshakeComplete()) {
try {
handShaker.finishHandshake(ch, (FullHttpResponse) msg)
handshakeFuture.setSuccess()
} catch (WebSocketHandshakeException e) {
log.warn("WebSocket Client failed to connect", e)
handshakeFuture.setFailure(e)
}
return
}
WebSocketFrame frame = (WebSocketFrame) msg
if (frame instanceof TextWebSocketFrame) {
String s = ((TextWebSocketFrame) frame).text()
} else if (frame instanceof CloseWebSocketFrame) {
log.info("WebSocket Client closing")
ch.close()
}
}
@Override
void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.error("WebSocket链路由于发生异常,与服务器连接已断开.", cause)
if (!handshakeFuture.isDone()) {
handshakeFuture.setFailure(cause)
}
ctx.close()
super.exceptionCaught(ctx, cause)
}
@Override
void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt
if (IdleState.WRITER_IDLE == event.state() || IdleState.READER_IDLE == event.state()) {
def channel = ctx.channel()
channel.writeAndFlush(new TextWebSocketFrame("dsf"))
}
} else {
super.userEventTriggered(ctx, evt)
}
}
}The implementation deliberately omits message persistence because the primary use case is testing massive (>10,000) concurrent connections, where storing responses is not essential.
Future work will compare the resource consumption of three socket client implementations—Go, Java Netty, and traditional Socket—under ultra‑high connection loads.
FunTester
10k followers, 1k articles | completely useless
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.