How to Integrate Spring Boot with Netty Using Protobuf for Efficient Data Transfer
This article walks through integrating Spring Boot with Netty and using Google Protobuf for binary data exchange, covering Protobuf basics, Maven setup, code generation, server and client implementations, heartbeat handling, reconnection logic, and testing results, all with complete Java examples.
Introduction
This article explains how to combine Spring Boot with Netty and use Protobuf for data transmission. Protobuf is a language‑independent binary format that is faster than XML and suitable for distributed applications.
Protobuf
Overview
Protocol Buffers (PB) is Google’s binary data exchange format, independent of language and platform. Implementations exist for Java, C#, C++, Go, and Python. Because it is binary, it is much faster than XML and can be used for network transmission, configuration files, data storage, etc.
Official repository: https://github.com/google/protobuf
Usage in Java
Create a proto file (using proto3 syntax) to define the data structure, e.g., a user with id, name, age, and state. Note that the proto file name and the generated Java class name must differ.
syntax = "proto3";
option java_package = "com.pancm.protobuf";
option java_outer_classname = "UserInfo";
message UserMsg {
int32 id = 1;
string name = 2;
int32 age = 3;
int32 state = 4;
}Compile the proto file with protoc.exe --java_out=E:\protobuf User.proto to generate the Java classes and place them in the project.
Example code to serialize and deserialize:
// Build a message
UserInfo.UserMsg.Builder userInfo = UserInfo.UserMsg.newBuilder();
userInfo.setId(1);
userInfo.setName("xuwujing");
userInfo.setAge(18);
UserInfo.UserMsg userMsg = userInfo.build();
// Serialize
ByteArrayOutputStream output = new ByteArrayOutputStream();
userMsg.writeTo(output);
byte[] byteArray = output.toByteArray();
// Deserialize
ByteArrayInputStream input = new ByteArrayInputStream(byteArray);
UserInfo.UserMsg userInfo2 = UserInfo.UserMsg.parseFrom(input);
System.out.println("id:" + userInfo2.getId());
System.out.println("name:" + userInfo2.getName());
System.out.println("age:" + userInfo2.getAge());Spring Boot Integration with Netty
Requirements: JDK 1.8, Netty 4.x, Protobuf 3.x.
Maven dependencies:
<properties>
<java.version>1.8</java.version>
<netty.version>4.1.22.Final</netty.version>
<protobuf.version>3.5.1</protobuf.version>
<springboot>1.5.9.RELEASE</springboot>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<version>${springboot}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>${protobuf.version}</version>
</dependency>
</dependencies>Server Code
Server is managed by Spring Boot. The Netty server class:
@Service("nettyServer")
public class NettyServer {
private static final int port = 9876;
private static EventLoopGroup boss = new NioEventLoopGroup();
private static EventLoopGroup work = new NioEventLoopGroup();
private static ServerBootstrap b = new ServerBootstrap();
@Autowired
private NettyServerFilter nettyServerFilter;
public void run() {
try {
b.group(boss, work);
b.channel(NioServerSocketChannel.class);
b.childHandler(nettyServerFilter);
ChannelFuture f = b.bind(port).sync();
System.out.println("服务端启动成功,端口是:" + port);
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
work.shutdownGracefully();
boss.shutdownGracefully();
}
}
}Server filter (pipeline configuration):
@Component
public class NettyServerFilter extends ChannelInitializer<SocketChannel> {
@Autowired
private NettyServerHandler nettyServerHandler;
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline ph = ch.pipeline();
ph.addLast(new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS));
ph.addLast(new ProtobufVarint32FrameDecoder());
ph.addLast(new ProtobufDecoder(UserMsg.getDefaultInstance()));
ph.addLast(new ProtobufVarint32LengthFieldPrepender());
ph.addLast(new ProtobufEncoder());
ph.addLast("nettyServerHandler", nettyServerHandler);
}
}Server handler implements business logic, heart‑beat detection, and message processing. It sends a protobuf message when a client connects and handles idle events to close inactive channels.
@Service("nettyServerHandler")
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
private int idle_count = 1;
private int count = 1;
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("连接的客户端地址:" + ctx.channel().remoteAddress());
UserInfo.UserMsg userMsg = UserInfo.UserMsg.newBuilder()
.setId(1).setAge(18).setName("xuwujing").setState(0).build();
ctx.writeAndFlush(userMsg);
super.channelActive(ctx);
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object obj) throws Exception {
if (obj instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) obj;
if (IdleState.READER_IDLE.equals(event.state())) {
System.out.println("已经5秒没有接收到客户端的信息了");
if (idle_count > 1) {
System.out.println("关闭这个不活跃的channel");
ctx.channel().close();
}
idle_count++;
}
} else {
super.userEventTriggered(ctx, obj);
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("第" + count + "次,服务端接受的消息:" + msg);
try {
if (msg instanceof UserMsg) {
UserInfo.UserMsg userState = (UserInfo.UserMsg) msg;
if (userState.getState() == 1) {
System.out.println("客户端业务处理成功!");
} else if (userState.getState() == 2) {
System.out.println("接受到客户端发送的心跳!");
} else {
System.out.println("未知命令!");
}
} else {
System.out.println("未知数据!" + msg);
return;
}
} finally {
ReferenceCountUtil.release(msg);
}
count++;
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}Spring Boot application entry point:
@SpringBootApplication
public class NettyServerApp {
public static void main(String[] args) {
ApplicationContext context = SpringApplication.run(NettyServerApp.class, args);
NettyServer nettyServer = context.getBean(NettyServer.class);
nettyServer.run();
}
}Client Code
The client mirrors the server pipeline and adds a heartbeat writer (IdleStateHandler with write timeout). It reconnects automatically if the connection is lost.
public void doConnect(Bootstrap bootstrap, EventLoopGroup eventLoopGroup) {
try {
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(nettyClientFilter)
.remoteAddress(host, port);
ChannelFuture f = bootstrap.connect().addListener((ChannelFuture futureListener) -> {
if (!futureListener.isSuccess()) {
System.out.println("与服务端断开连接!在10s之后准备尝试重连!");
futureListener.channel().eventLoop().schedule(() -> doConnect(new Bootstrap(), eventLoopGroup), 10, TimeUnit.SECONDS);
}
});
f.channel().closeFuture().sync();
} catch (Exception e) {
System.out.println("客户端连接失败!" + e.getMessage());
}
}Client handler sends a heartbeat every 4 seconds and processes incoming protobuf messages.
@Service("nettyClientHandler")
@ChannelHandler.Sharable
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
private int fcount = 1;
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("建立连接时:" + new Date());
ctx.fireChannelActive();
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("关闭连接时:" + new Date());
final EventLoop eventLoop = ctx.channel().eventLoop();
nettyClient.doConnect(new Bootstrap(), eventLoop);
super.channelInactive(ctx);
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object obj) throws Exception {
if (obj instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) obj;
if (IdleState.WRITER_IDLE.equals(event.state())) {
UserMsg.Builder userState = UserMsg.newBuilder().setState(2);
ctx.channel().writeAndFlush(userState);
fcount++;
}
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (!(msg instanceof UserMsg)) {
System.out.println("未知数据!" + msg);
return;
}
try {
UserInfo.UserMsg userMsg = (UserInfo.UserMsg) msg;
System.out.println("客户端接受到的用户信息。编号:" + userMsg.getId() + ",姓名:" + userMsg.getName() + ",年龄:" + userMsg.getAge());
UserMsg.Builder userState = UserMsg.newBuilder().setState(1);
ctx.writeAndFlush(userState);
System.out.println("成功发送给服务端!");
} finally {
ReferenceCountUtil.release(msg);
}
}
}Testing
Start the server, then the client. The server logs connection, received messages, and heartbeat detection. The client logs connection time, received user info, and periodic heartbeat sends. Reconnection logic is demonstrated by starting the client before the server; the client retries after 10 seconds and resumes communication.
Conclusion
The tutorial shows how to integrate Spring Boot with Netty and use Protobuf for efficient binary data transfer, providing complete Maven configuration, source code, and testing steps. Project source code is available at GitHub .
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Programmer DD
A tinkering programmer and author of "Spring Cloud Microservices in Action"
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
