How to Build a Scalable WebSocket Service with Netty, Spring Boot, and Vue2
This guide walks through creating a full‑stack WebSocket solution using Netty for the server, Spring Boot for lifecycle management, Redis for token validation, and Vue2 on the client, covering configuration, token handling, heartbeat detection, channel management, and message broadcasting with complete code examples.
Background
The project uses the Ruoyi framework (Spring Boot) on the backend and Vue2 on the frontend. To enable real‑time communication, a WebSocket service based on Netty is required, along with token validation, heartbeat monitoring, and channel management.
Backend Architecture
Key components include:
NettyConfig : defines beans for ChannelManager, RedisService, EventLoopGroup, ChannelInitializer, and ServerBootstrap. It also sets up the WebSocket path, token keys, and heartbeat message.
NettyWebSocketServer : implements SmartLifecycle to start and stop the Netty server on Spring context refresh and shutdown, ensuring a single start using a running flag.
WebSocketPathParamHandler : parses the initial HTTP request to extract token and userId, stores them as channel attributes, validates the token against Redis, and sends a welcome message.
WebSocketFrameHandler : handles text frames, logs connection events, and cleans up channel mappings on disconnect.
ChannelManager : a thread‑safe component that maps channelId ↔ Channel, token ↔ channelId, and userId ↔ channelId. It provides methods to bind/unbind tokens, broadcast messages, and send to specific tokens or user IDs.
MessagePushService : a Spring @Service that exposes APIs for broadcasting, sending to tokens, sending to user IDs, and retrieving the online user map.
WebSocketController : REST endpoints for querying online users, broadcasting a message to all, and sending a message to selected users. It uses the MessagePushService and a remote user service to resolve user details.
Key Backend Code
package com.ndx.pad.netty.config;
import com.ruoyi.common.core.netty.service.ChannelManager;
import com.ruoyi.common.redis.service.RedisService;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.*;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.logging.*;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.AttributeKey;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.TimeUnit;
@Slf4j
@Configuration
public class NettyConfig {
public static final String WEBSOCKET_PATH = "/ws";
public static final String TOKEN = "token";
public static final String USER_ID = "userId";
public static final AttributeKey<Map<String, String>> URL_PARAMS_KEY = AttributeKey.valueOf("url_params_key");
public static final AttributeKey<String> TOKEN_KEY = AttributeKey.valueOf(TOKEN);
public static final AttributeKey<String> USERID_KEY = AttributeKey.valueOf(USER_ID);
public static final String HEART_BEAT = "{\"type\":\"heartbeat\"}";
@Bean
public ChannelManager channelManager() { return new ChannelManager(); }
@Bean
public RedisService redisService() { return new RedisService(); }
@Bean(destroyMethod = "shutdownGracefully")
public EventLoopGroup bossGroup() { return new NioEventLoopGroup(1); }
@Bean(destroyMethod = "shutdownGracefully")
public EventLoopGroup workerGroup() { return new NioEventLoopGroup(); }
@Bean
public ChannelInitializer<SocketChannel> webSocketChannelInitializer(ChannelManager channelManager, RedisService redisService) {
return new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new HttpObjectAggregator(65536));
pipeline.addLast(new IdleStateHandler(40, 0, 0, TimeUnit.SECONDS));
pipeline.addLast(new WebSocketPathParamHandler(channelManager, redisService));
pipeline.addLast(new WebSocketServerProtocolHandler(WEBSOCKET_PATH, null, true));
pipeline.addLast(new WebSocketFrameHandler(channelManager));
}
};
}
@Bean
public ServerBootstrap serverBootstrap(EventLoopGroup bossGroup, EventLoopGroup workerGroup,
ChannelInitializer<SocketChannel> channelInitializer) {
return new ServerBootstrap()
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(channelInitializer)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
}
} package com.ndx.pad.netty.config;
import com.ruoyi.common.core.exception.CustomException;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.SmartLifecycle;
import org.springframework.stereotype.Component;
import java.util.Set;
@Slf4j
@Component
public class NettyWebSocketServer implements SmartLifecycle {
private static final Logger logger = LoggerFactory.getLogger(NettyWebSocketServer.class);
private final ServerBootstrap serverBootstrap;
private final EventLoopGroup bossGroup;
private final EventLoopGroup workerGroup;
private Channel channel;
private volatile boolean running = false;
public NettyWebSocketServer(ServerBootstrap serverBootstrap,
@Qualifier("bossGroup") EventLoopGroup bossGroup,
@Qualifier("workerGroup") EventLoopGroup workerGroup) {
this.serverBootstrap = serverBootstrap;
this.bossGroup = bossGroup;
this.workerGroup = workerGroup;
}
@Override
public void start() {
if (!running) {
try {
int port = 8111;
channel = serverBootstrap.bind(port).sync().channel();
running = true;
logger.info("Netty WebSocket server started on port :" + port);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new CustomException("Server start interrupted", e);
}
}
}
@Override
public void stop() {
if (channel != null) {
channel.close().addListener(future -> {
if (future.isSuccess()) {
logger.info("Server channel closed");
}
});
}
running = false;
}
@Override
public void stop(Runnable callback) { stop(); callback.run(); }
@Override
public boolean isRunning() { return running; }
@Override
public int getPhase() { return Integer.MAX_VALUE; }
} package com.ndx.pad.netty.config;
import com.ruoyi.common.core.netty.service.ChannelManager;
import com.ruoyi.common.redis.service.RedisService;
import io.netty.channel.*;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.timeout.IdleStateEvent;
import lombok.extern.slf4j.Slf4j;
import java.util.HashMap;
import java.util.Map;
@Slf4j
public class WebSocketPathParamHandler extends ChannelInboundHandlerAdapter {
private final ChannelManager channelManager;
private final RedisService redisService;
private static final String ACCESS_TOKEN = "login_tokens:";
private static final int MAX_LOST_HEARTBEAT = 3;
private int lostHeartbeatCount = 0;
public WebSocketPathParamHandler(ChannelManager channelManager, RedisService redisService) {
this.channelManager = channelManager;
this.redisService = redisService;
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
// handle heartbeat timeout
// omitted for brevity
} else {
super.userEventTriggered(ctx, evt);
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof FullHttpRequest) {
FullHttpRequest request = (FullHttpRequest) msg;
String uri = request.uri();
if (uri.contains("?")) {
Map<String, String> paramMap = getUrlParams(uri);
String tokenValue = paramMap.get(NettyConfig.TOKEN);
String userIdValue = paramMap.get(NettyConfig.USER_ID);
ctx.channel().attr(NettyConfig.TOKEN_KEY).set(tokenValue);
ctx.channel().attr(NettyConfig.USERID_KEY).set(userIdValue);
ctx.channel().attr(NettyConfig.URL_PARAMS_KEY).set(paramMap);
ctx.writeAndFlush(new TextWebSocketFrame("首次连接,获取token:" + tokenValue + ",userId:" + userIdValue));
channelManager.bindTokenChannel(tokenValue, userIdValue, ctx.channel().id().asShortText(), ctx.channel());
if (StrUtil.isNotBlank(tokenValue) && StrUtil.isNotBlank(userIdValue)) {
boolean flag = redisService.redisTemplate.hasKey(ACCESS_TOKEN + tokenValue);
if (flag) {
request.setUri(NettyConfig.WEBSOCKET_PATH);
}
}
}
}
if (msg instanceof TextWebSocketFrame) {
TextWebSocketFrame frame = (TextWebSocketFrame) msg;
if (NettyConfig.HEART_BEAT.equals(frame.text())) {
lostHeartbeatCount = 0;
String userId = ctx.channel().attr(NettyConfig.USERID_KEY).get();
log.debug("Channel {} received heartbeat from user {}", ctx.channel().id().asShortText(), userId);
ctx.writeAndFlush(new TextWebSocketFrame(NettyConfig.HEART_BEAT));
return;
}
}
super.channelRead(ctx, msg);
}
private static Map<String, String> getUrlParams(String url) {
Map<String, String> map = new HashMap<>();
url = url.replace("?", ",");
if (!url.contains(",")) return map;
String[] arr = url.split(",")[1].split("&");
for (String s : arr) {
String[] kv = s.split("=");
if (kv.length == 2) map.put(kv[0], kv[1]);
}
return map;
}
} package com.ndx.pad.netty.config;
import com.ruoyi.common.core.netty.service.ChannelManager;
import io.netty.channel.*;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@ChannelHandler.Sharable
public class WebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
private final ChannelManager channelManager;
public WebSocketFrameHandler(ChannelManager channelManager) {
this.channelManager = channelManager;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) {
String text = msg.text();
log.debug("Channel {} received message: {}", ctx.channel().id().asShortText(), text);
// custom business logic can be added here
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.debug("Connection established, ChannelId: {}", ctx.channel().id().asShortText());
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
String token = ctx.channel().attr(NettyConfig.TOKEN_KEY).get();
String userId = ctx.channel().attr(NettyConfig.USERID_KEY).get();
String channelId = ctx.channel().id().asShortText();
if (token != null && userId != null) {
channelManager.unbindToken(token, userId, channelId);
}
log.debug("Channel disconnected, token:{}, userId:{}, ChannelId:{}", token, userId, channelId);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.error("Channel exception, params:{}, ChannelId:{}, error:{}",
ctx.channel().attr(NettyConfig.URL_PARAMS_KEY).get(), ctx.channel().id().asShortText(), cause);
cause.printStackTrace();
ctx.close();
}
} package com.ruoyi.common.core.netty.service;
import io.netty.channel.Channel;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@Slf4j
@Component
public class ChannelManager {
private final ConcurrentMap<String, Channel> CHANNEL_MAP = new ConcurrentHashMap<>();
private final ConcurrentMap<String, String> TOKEN_CHANNEL_MAP = new ConcurrentHashMap<>();
private final ConcurrentMap<String, Set<String>> USERID_CHANNEL_MAP = new ConcurrentHashMap<>();
public void bindTokenChannel(String token, String userId, String channelId, Channel channel) {
log.debug("Binding token {} to channelId {}", token, channelId);
TOKEN_CHANNEL_MAP.put(token, channelId);
USERID_CHANNEL_MAP.computeIfAbsent(userId, k -> new HashSet<>()).add(channelId);
CHANNEL_MAP.put(channelId, channel);
}
public void unbindToken(String token, String userId, String channelId) {
log.debug("Unbinding token {} from channelId {}", token, channelId);
CHANNEL_MAP.remove(channelId);
TOKEN_CHANNEL_MAP.remove(token);
Set<String> set = USERID_CHANNEL_MAP.get(userId);
if (set != null) set.remove(channelId);
}
public void broadcast(String message) {
CHANNEL_MAP.values().forEach(ch -> {
if (ch.isActive()) {
ch.writeAndFlush(new TextWebSocketFrame(message));
}
});
}
public void sendToTokens(String message, String... tokens) {
for (String token : tokens) {
String channelId = TOKEN_CHANNEL_MAP.get(token);
if (channelId != null) {
Channel ch = CHANNEL_MAP.get(channelId);
if (ch != null && ch.isActive()) {
ch.writeAndFlush(new TextWebSocketFrame(message));
}
}
}
}
public void sendToUserIds(String message, String... userIds) {
for (String userId : userIds) {
Set<String> ids = USERID_CHANNEL_MAP.get(userId);
if (ids != null) {
for (String cid : ids) {
Channel ch = CHANNEL_MAP.get(cid);
if (ch != null && ch.isActive()) {
ch.writeAndFlush(new TextWebSocketFrame(message));
}
}
}
}
}
public ConcurrentMap<String, Set<String>> getUserIdChannelMap() { return USERID_CHANNEL_MAP; }
} package com.ruoyi.common.core.netty.service;
import com.ruoyi.common.core.netty.vo.TextMsgVO;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
@Service
public class MessagePushService {
@Autowired
private ChannelManager channelManager;
public ConcurrentMap<String, Set<String>> getOnlineUserList() {
return channelManager.getUserIdChannelMap();
}
public void broadcast(String message) {
channelManager.broadcast(message);
}
public void sendToTokens(String message, String... tokens) {
channelManager.sendToTokens(message, tokens);
}
public void sendToUserIds(String message, String... userIds) {
channelManager.sendToUserIds(message, userIds);
}
public void sendMsgByUserIdList(TextMsgVO vo) {
channelManager.sendMsgByUserIdList(vo);
}
} package com.ndx.pad.common.controller;
import com.ruoyi.common.core.netty.service.MessagePushService;
import com.ruoyi.common.core.netty.vo.TextMsgVO;
import com.ruoyi.common.core.web.controller.BaseController;
import com.ruoyi.common.core.web.domain.AjaxResult;
import com.ruoyi.common.core.web.page.TableDataInfo;
import com.ruoyi.system.api.RemoteUserService;
import com.ruoyi.system.api.domain.SysUser;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import java.util.*;
import java.util.stream.Collectors;
@RestController
@RequestMapping("/webSocket")
public class WebSocketController extends BaseController {
@Autowired
private MessagePushService messagePushService;
@Autowired
private RemoteUserService remoteUserService;
@GetMapping("/onlineUserList")
public TableDataInfo onlineUserList() {
ConcurrentMap<String, Set<String>> onlineMap = messagePushService.getOnlineUserList();
if (onlineMap.isEmpty()) return getDataTable(new ArrayList<>());
Long[] ids = onlineMap.keySet().stream().map(Long::valueOf).toArray(Long[]::new);
SysUser user = new SysUser();
user.setUserIds(ids);
TableDataInfo info = remoteUserService.queryUserList(user);
return info.getCode() == 200 && !info.getRows().isEmpty() ? getDataTable(info.getRows()) : getDataTable(new ArrayList<>());
}
@PostMapping("/sendMsgToAll")
public AjaxResult sendMsgToAll(@RequestBody String msg) {
messagePushService.broadcast(msg);
return AjaxResult.success();
}
@PostMapping("/sendMsgByUserIdList")
public AjaxResult sendMsgByUserIdList(@RequestBody TextMsgVO vo) {
messagePushService.sendMsgByUserIdList(vo);
return AjaxResult.success();
}
}Frontend Architecture
The Vue2 client uses a Vuex module to manage the WebSocket connection, automatic reconnection, and heartbeat checks. It also integrates an event bus for broadcasting received messages to any component.
Key Frontend Files
websocket.js : Vuex module that creates the WebSocket instance, handles onopen, onmessage, onclose, and onerror, sends heartbeat messages every 30 seconds, and provides actions to start, reconnect, and clear the connection.
common.js : API wrappers for onlineUserList, sendMsgToAll, and sendMsgByUserIdList using the project's request utility.
bus.js : a simple Vue instance used as a global event bus.
main.js : registers the bus on Vue.prototype.$bus and exposes the API functions globally.
App.vue (mounted hook) : dispatches reconnectWebSocket on page load and listens for the custom textMessageEvent to show a notification when a message arrives.
Frontend Code Samples
import { Notification } from 'element-ui';
import { getToken } from '@/utils/auth';
import user from '@/store/modules/user';
import eventBus from '@/utils/bus';
const reconnectInterval = 5000;
const heartCheckInterval = 30000;
const heartCheckMessage = '{"type":"heartbeat"}';
const state = {
userId: null,
ws: null,
heartCheckTimer: null,
textMessageEvent: 'textMessageEvent'
};
const mutations = {
SET_WS(state, { ws, userId }) { state.userId = userId; state.ws = ws; },
CLEAR_WS(state) { state.ws = null; },
SET_HEART_CHECK_TIMER(state, timer) { state.heartCheckTimer = timer; }
};
const actions = {
startWebSocket({ commit, dispatch, state }) {
if (state.ws) {
const stateMap = { [WebSocket.CONNECTING]: 'Connecting', [WebSocket.OPEN]: 'Open', [WebSocket.CLOSING]: 'Closing', [WebSocket.CLOSED]: 'Closed' };
console.log(`Current state: ${stateMap[state.ws.readyState]}`);
if (state.ws.readyState === WebSocket.OPEN) return;
}
const userId = user.state.userId;
if (userId && getToken()) {
dispatch('clearWebSocket');
const socketUrl = `${process.env.VUE_APP_SOCKET_PATH}/ws?token=${getToken()}&userId=${userId}`;
const ws = new WebSocket(socketUrl);
ws.onmessage = e => {
console.log(`Message received: ${e.data}`);
if (e.data !== heartCheckMessage) eventBus.$emit(state.textMessageEvent, e.data);
};
ws.onclose = () => {
console.log('WebSocket closed');
dispatch('reconnectWebSocket');
};
ws.onopen = () => {
console.log('WebSocket opened');
commit('SET_WS', { ws, userId });
dispatch('startHeartCheck');
};
ws.onerror = e => console.error('WebSocket error', e);
}
},
sendWebSocketMessage({ state }, msg) { state.ws.send(msg); },
reconnectWebSocket({ dispatch }) {
console.log('Attempting reconnection');
dispatch('clearWebSocket');
setTimeout(() => dispatch('startWebSocket'), reconnectInterval);
},
clearWebSocket({ commit, state, dispatch }) {
dispatch('clearHeartCheckTimer');
if (state.ws) {
state.ws.onclose = null;
state.ws.onerror = null;
state.ws.onmessage = null;
state.ws.onopen = null;
if (state.ws.readyState === WebSocket.OPEN || state.ws.readyState === WebSocket.CONNECTING) {
state.ws.close();
}
commit('CLEAR_WS');
}
},
startHeartCheck({ commit, dispatch, state }) {
dispatch('clearHeartCheckTimer');
if (state.ws?.readyState === WebSocket.OPEN) dispatch('sendWebSocketMessage', heartCheckMessage);
const timer = setInterval(() => {
if (!state.ws || state.ws.readyState !== WebSocket.OPEN) {
console.log('Heartbeat failed, reconnecting');
dispatch('reconnectWebSocket');
} else {
dispatch('sendWebSocketMessage', heartCheckMessage);
}
}, heartCheckInterval);
commit('SET_HEART_CHECK_TIMER', timer);
},
clearHeartCheckTimer({ commit, state }) {
if (state.heartCheckTimer) {
clearInterval(state.heartCheckTimer);
commit('SET_HEART_CHECK_TIMER', null);
}
}
};
export default { state, mutations, actions }; export function onlineUserList(query) {
return request({ url: '/pad/webSocket/onlineUserList', method: 'get', params: query });
}
export function sendMsgToAll(data) {
return request({ url: '/pad/webSocket/sendMsgToAll', method: 'post', data });
}
export function sendMsgByUserIdList(data) {
return request({ url: '/pad/webSocket/sendMsgByUserIdList', method: 'post', data });
} import Vue from 'vue';
export default new Vue(); import { sendMsgByUserIdList, sendMsgToAll, onlineUserList } from '@/api/tool/common';
import Bus from './utils/bus';
Vue.prototype.$bus = Bus;
Vue.prototype.sendMsgByUserIdList = sendMsgByUserIdList;
Vue.prototype.sendMsgToAll = sendMsgToAll;
Vue.prototype.onlineUserList = onlineUserList; mounted() {
this.$store.dispatch('reconnectWebSocket');
const eventName = this.$store.state.webSocket.textMessageEvent;
this.$bus.$on(eventName, data => {
Notification({ type: 'success', title: 'New Message', message: data, position: 'top-right', duration: 3000, showClose: true });
});
}Why This Approach Works
SmartLifecycle guarantees that the Netty server starts after Spring context initialization and shuts down cleanly.
Separating Netty ChannelHandler creation from Spring allows constructor injection of ChannelManager and RedisService, enabling token validation and user‑channel mapping.
Heartbeat messages (both client‑side and server‑side) detect dead connections and trigger automatic reconnection.
The Vuex module prevents duplicate connections, clears timers on disconnect, and provides a global event bus for UI components to react to incoming messages.
Conclusion
By combining Netty’s high‑performance networking with Spring Boot’s lifecycle management and Vue2’s reactive UI, developers can build a robust, token‑secured WebSocket system that supports single‑user, multi‑user, and broadcast messaging, while handling connection health and reconnection automatically.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Selected Java Interview Questions
A professional Java tech channel sharing common knowledge to help developers fill gaps. Follow us!
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
