Distributed WebSocket Messaging with Redis and Kafka in Spring
This article explains how to enable cross‑server WebSocket communication in a distributed Spring application by using a message queue such as Redis or Kafka, detailing two architectural approaches, concrete code examples, configuration files, and a sample client page.
Last week a colleague on Knowledge Planet asked a question about WebSocket in a distributed environment: when Nginx load‑balances users across multiple servers, how can a user connected to one server send a message to a user connected to another server?
The answer is to implement distributed WebSocket, which can be achieved by either of the following two solutions:
Publish messages (containing userId and content ) to a message‑queue topic (e.g., Redis , Kafka ). Each application node subscribes to the topic, extracts the receiver’s user ID, checks whether the corresponding WebSocket connection exists locally, and forwards the message if it does.
After a user establishes a WebSocket connection, record the node where the connection resides in Redis. Then use a message queue to push messages directly to the node that holds the receiver’s connection (more complex but reduces network traffic).
Implementation Plan
The following demonstrates the first solution in detail.
1. Define a WebSocket Channel enum class
public enum WebSocketChannelEnum {
// Simple point‑to‑point chat for testing
CHAT("CHAT", "Simple point‑to‑point chat", "/topic/reply");
WebSocketChannelEnum(String code, String description, String subscribeUrl) {
this.code = code;
this.description = description;
this.subscribeUrl = subscribeUrl;
}
/** Unique CODE */
private String code;
/** Description */
private String description;
/** WebSocket client subscription URL */
private String subscribeUrl;
public String getCode() { return code; }
public String getDescription() { return description; }
public String getSubscribeUrl() { return subscribeUrl; }
/** Find enum by CODE */
public static WebSocketChannelEnum fromCode(String code) {
if (StringUtils.isNoneBlank(code)) {
for (WebSocketChannelEnum channelEnum : values()) {
if (channelEnum.code.equals(code)) {
return channelEnum;
}
}
}
return null;
}
}2. Configure a Redis‑based message queue
In medium‑to‑large production projects, Redis is not recommended as a reliable message queue; consider using professional middleware such as Kafka or RabbitMQ instead.
@Configuration
@ConditionalOnClass({JedisCluster.class})
public class RedisConfig {
@Value("${spring.redis.timeout}")
private String timeOut;
@Value("${spring.redis.cluster.nodes}")
private String nodes;
@Value("${spring.redis.cluster.max-redirects}")
private int maxRedirects;
@Value("${spring.redis.jedis.pool.max-active}")
private int maxActive;
@Value("${spring.redis.jedis.pool.max-wait}")
private int maxWait;
@Value("${spring.redis.jedis.pool.max-idle}")
private int maxIdle;
@Value("${spring.redis.jedis.pool.min-idle}")
private int minIdle;
@Value("${spring.redis.message.topic-name}")
private String topicName;
@Bean
public JedisPoolConfig jedisPoolConfig() {
JedisPoolConfig config = new JedisPoolConfig();
config.setMaxTotal(maxActive);
config.setMaxIdle(maxIdle);
config.setMinIdle(minIdle);
config.setMaxWaitMillis(maxWait);
return config;
}
@Bean
public RedisClusterConfiguration redisClusterConfiguration() {
RedisClusterConfiguration configuration = new RedisClusterConfiguration(Arrays.asList(nodes));
configuration.setMaxRedirects(maxRedirects);
return configuration;
}
/** JedisConnectionFactory */
@Bean
public JedisConnectionFactory jedisConnectionFactory(RedisClusterConfiguration configuration, JedisPoolConfig jedisPoolConfig) {
return new JedisConnectionFactory(configuration, jedisPoolConfig);
}
/** Use Jackson to serialize objects */
@Bean
public Jackson2JsonRedisSerializer
jackson2JsonRedisSerializer() {
Jackson2JsonRedisSerializer
serializer = new Jackson2JsonRedisSerializer<>(Object.class);
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
serializer.setObjectMapper(objectMapper);
return serializer;
}
/** RedisTemplate */
@Bean
public RedisTemplate
redisTemplate(JedisConnectionFactory factory, Jackson2JsonRedisSerializer
jackson2JsonRedisSerializer) {
RedisTemplate
redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(factory);
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
redisTemplate.setKeySerializer(stringRedisSerializer);
redisTemplate.setHashKeySerializer(stringRedisSerializer);
redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);
redisTemplate.afterPropertiesSet();
return redisTemplate;
}
/** Message listener */
@Bean
MessageListenerAdapter messageListenerAdapter(MessageReceiver messageReceiver, Jackson2JsonRedisSerializer
jackson2JsonRedisSerializer) {
MessageListenerAdapter adapter = new MessageListenerAdapter(messageReceiver, "receiveMessage");
adapter.setSerializer(jackson2JsonRedisSerializer);
return adapter;
}
/** Message listener container */
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter messageListenerAdapter) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.addMessageListener(messageListenerAdapter, new PatternTopic(topicName));
return container;
}
}Corresponding Spring configuration (application.yml) example:
spring:
...
# redis
redis:
cluster:
nodes: namenode22:6379,datanode23:6379,datanode24:6379
max-redirects: 6
timeout: 300000
jedis:
pool:
max-active: 8
max-wait: 100000
max-idle: 8
min-idle: 0
# custom topic path
message:
topic-name: topic-test3. Define a Redis message handler
@Component
public class MessageReceiver {
private final Logger logger = LoggerFactory.getLogger(getClass());
@Autowired
private SimpMessagingTemplate messagingTemplate;
@Autowired
private SimpUserRegistry userRegistry;
/** Process WebSocket messages */
public void receiveMessage(RedisWebsocketMsg redisWebsocketMsg) {
logger.info(MessageFormat.format("Received Message: {0}", redisWebsocketMsg));
// 1. Get the user and check if the connection exists on this node
SimpUser simpUser = userRegistry.getUser(redisWebsocketMsg.getReceiver());
if (simpUser != null && StringUtils.isNoneBlank(simpUser.getName())) {
// 2. Get the subscription URL of the client
WebSocketChannelEnum channelEnum = WebSocketChannelEnum.fromCode(redisWebsocketMsg.getChannelCode());
if (channelEnum != null) {
// 3. Send the message to the client
messagingTemplate.convertAndSendToUser(redisWebsocketMsg.getReceiver(), channelEnum.getSubscribeUrl(), redisWebsocketMsg.getContent());
}
}
}
}4. Send WebSocket messages from a Controller
@Controller
@RequestMapping("/wsTemplate")
public class RedisMessageController {
private final Logger logger = LoggerFactory.getLogger(getClass());
@Value("${spring.redis.message.topic-name}")
private String topicName;
@Autowired
private SimpMessagingTemplate messagingTemplate;
@Autowired
private SimpUserRegistry userRegistry;
@Resource(name = "redisServiceImpl")
private RedisService redisService;
/** Send a message to a specific user */
@PostMapping("/sendToUser")
@ResponseBody
public String chat(HttpServletRequest request) {
String receiver = request.getParameter("receiver");
String msg = request.getParameter("msg");
HttpSession session = SpringContextUtils.getSession();
User loginUser = (User) session.getAttribute(Constants.SESSION_USER);
HelloMessage resultData = new HelloMessage(MessageFormat.format("{0} say: {1}", loginUser.getUsername(), msg));
this.sendToUser(loginUser.getUsername(), receiver, WebSocketChannelEnum.CHAT.getSubscribeUrl(), JsonUtils.toJson(resultData));
return "ok";
}
/** Send a message and handle the case where the receiver is offline */
private void sendToUser(String sender, String receiver, String destination, String payload) {
SimpUser simpUser = userRegistry.getUser(receiver);
if (simpUser != null && StringUtils.isNoneBlank(simpUser.getName())) {
messagingTemplate.convertAndSendToUser(receiver, destination, payload);
} else if (redisService.isSetMember(Constants.REDIS_WEBSOCKET_USER_SET, receiver)) {
RedisWebsocketMsg
redisWebsocketMsg = new RedisWebsocketMsg<>(receiver, WebSocketChannelEnum.CHAT.getCode(), payload);
redisService.convertAndSend(topicName, redisWebsocketMsg);
} else {
String listKey = Constants.REDIS_UNREAD_MSG_PREFIX + receiver + ":" + destination;
logger.info(MessageFormat.format("Receiver {0} has not established WebSocket, message from {1} will be stored in Redis list {2}", receiver, sender, listKey));
redisService.addToListRight(listKey, ExpireEnum.UNREAD_MSG, payload);
}
}
/** Pull unread messages for a specific destination */
@PostMapping("/pullUnreadMessage")
@ResponseBody
public Map
pullUnreadMessage(String destination) {
Map
result = new HashMap<>();
try {
HttpSession session = SpringContextUtils.getSession();
User loginUser = (User) session.getAttribute(Constants.SESSION_USER);
String listKey = Constants.REDIS_UNREAD_MSG_PREFIX + loginUser.getUsername() + ":" + destination;
List
messageList = redisService.rangeList(listKey, 0, -1);
result.put("code", "200");
if (messageList != null && !messageList.isEmpty()) {
redisService.delete(listKey);
result.put("result", messageList);
}
} catch (Exception e) {
result.put("code", "500");
result.put("msg", e.getMessage());
}
return result;
}
}5. WebSocket related configuration
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Autowired
private AuthHandshakeInterceptor authHandshakeInterceptor;
@Autowired
private MyHandshakeHandler myHandshakeHandler;
@Autowired
private MyChannelInterceptor myChannelInterceptor;
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/chat-websocket")
.addInterceptors(authHandshakeInterceptor)
.setHandshakeHandler(myHandshakeHandler)
.withSockJS();
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
// Client sends messages to /message/xxx
registry.setApplicationDestinationPrefixes("/message");
// Server broadcasts to /topic/yyy
registry.enableSimpleBroker("/topic");
// Prefix for messages to a specific user
registry.setUserDestinationPrefix("/user/");
}
@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
registration.interceptors(myChannelInterceptor);
}
}6. Sample HTML page
<head>
<meta charset="UTF-8"/>
<title>Chat With STOMP Message</title>
<script src="https://cdnjs.cloudflare.com/ajax/libs/jquery/3.3.1/jquery.min.js"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/sockjs-client/1.1.4/sockjs.min.js"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/stomp.js/2.3.3/stomp.min.js"></script>
... (CSS and JS omitted for brevity) ...
<script type="text/javascript">
var stompClient = null;
// UI logic for connect / disconnect / send message, pull unread messages, etc.
// (The original JavaScript code is kept unchanged.)
</script>
</head>
<body>
... (HTML form for target URL, receiver, message, and buttons) ...
</body>Finally, the author adds a friendly reminder to support the work by following the public account and replying with keywords to obtain related PDF collections (Spring Cloud, Spring Boot, MyBatis). Likes, shares, and collections are also encouraged.
Code Ape Tech Column
Former Ant Group P8 engineer, pure technologist, sharing full‑stack Java, job interview and career advice through a column. Site: java-family.cn
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.