Backend Development 17 min read

Distributed WebSocket Messaging with Redis and Kafka in Spring

This article explains how to enable cross‑server WebSocket communication in a distributed Spring application by using a message queue such as Redis or Kafka, detailing two architectural approaches, concrete code examples, configuration files, and a sample client page.

Code Ape Tech Column
Code Ape Tech Column
Code Ape Tech Column
Distributed WebSocket Messaging with Redis and Kafka in Spring

Last week a colleague on Knowledge Planet asked a question about WebSocket in a distributed environment: when Nginx load‑balances users across multiple servers, how can a user connected to one server send a message to a user connected to another server?

The answer is to implement distributed WebSocket, which can be achieved by either of the following two solutions:

Publish messages (containing userId and content ) to a message‑queue topic (e.g., Redis , Kafka ). Each application node subscribes to the topic, extracts the receiver’s user ID, checks whether the corresponding WebSocket connection exists locally, and forwards the message if it does.

After a user establishes a WebSocket connection, record the node where the connection resides in Redis. Then use a message queue to push messages directly to the node that holds the receiver’s connection (more complex but reduces network traffic).

Implementation Plan

The following demonstrates the first solution in detail.

1. Define a WebSocket Channel enum class

public enum WebSocketChannelEnum {
    // Simple point‑to‑point chat for testing
    CHAT("CHAT", "Simple point‑to‑point chat", "/topic/reply");

    WebSocketChannelEnum(String code, String description, String subscribeUrl) {
        this.code = code;
        this.description = description;
        this.subscribeUrl = subscribeUrl;
    }

    /** Unique CODE */
    private String code;
    /** Description */
    private String description;
    /** WebSocket client subscription URL */
    private String subscribeUrl;

    public String getCode() { return code; }
    public String getDescription() { return description; }
    public String getSubscribeUrl() { return subscribeUrl; }

    /** Find enum by CODE */
    public static WebSocketChannelEnum fromCode(String code) {
        if (StringUtils.isNoneBlank(code)) {
            for (WebSocketChannelEnum channelEnum : values()) {
                if (channelEnum.code.equals(code)) {
                    return channelEnum;
                }
            }
        }
        return null;
    }
}

2. Configure a Redis‑based message queue

In medium‑to‑large production projects, Redis is not recommended as a reliable message queue; consider using professional middleware such as Kafka or RabbitMQ instead.

@Configuration
@ConditionalOnClass({JedisCluster.class})
public class RedisConfig {

    @Value("${spring.redis.timeout}")
    private String timeOut;

    @Value("${spring.redis.cluster.nodes}")
    private String nodes;

    @Value("${spring.redis.cluster.max-redirects}")
    private int maxRedirects;

    @Value("${spring.redis.jedis.pool.max-active}")
    private int maxActive;

    @Value("${spring.redis.jedis.pool.max-wait}")
    private int maxWait;

    @Value("${spring.redis.jedis.pool.max-idle}")
    private int maxIdle;

    @Value("${spring.redis.jedis.pool.min-idle}")
    private int minIdle;

    @Value("${spring.redis.message.topic-name}")
    private String topicName;

    @Bean
    public JedisPoolConfig jedisPoolConfig() {
        JedisPoolConfig config = new JedisPoolConfig();
        config.setMaxTotal(maxActive);
        config.setMaxIdle(maxIdle);
        config.setMinIdle(minIdle);
        config.setMaxWaitMillis(maxWait);
        return config;
    }

    @Bean
    public RedisClusterConfiguration redisClusterConfiguration() {
        RedisClusterConfiguration configuration = new RedisClusterConfiguration(Arrays.asList(nodes));
        configuration.setMaxRedirects(maxRedirects);
        return configuration;
    }

    /** JedisConnectionFactory */
    @Bean
    public JedisConnectionFactory jedisConnectionFactory(RedisClusterConfiguration configuration, JedisPoolConfig jedisPoolConfig) {
        return new JedisConnectionFactory(configuration, jedisPoolConfig);
    }

    /** Use Jackson to serialize objects */
    @Bean
    public Jackson2JsonRedisSerializer
jackson2JsonRedisSerializer() {
        Jackson2JsonRedisSerializer
serializer = new Jackson2JsonRedisSerializer<>(Object.class);
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        serializer.setObjectMapper(objectMapper);
        return serializer;
    }

    /** RedisTemplate */
    @Bean
    public RedisTemplate
redisTemplate(JedisConnectionFactory factory, Jackson2JsonRedisSerializer
jackson2JsonRedisSerializer) {
        RedisTemplate
redisTemplate = new RedisTemplate<>();
        redisTemplate.setConnectionFactory(factory);
        StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
        redisTemplate.setKeySerializer(stringRedisSerializer);
        redisTemplate.setHashKeySerializer(stringRedisSerializer);
        redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
        redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);
        redisTemplate.afterPropertiesSet();
        return redisTemplate;
    }

    /** Message listener */
    @Bean
    MessageListenerAdapter messageListenerAdapter(MessageReceiver messageReceiver, Jackson2JsonRedisSerializer
jackson2JsonRedisSerializer) {
        MessageListenerAdapter adapter = new MessageListenerAdapter(messageReceiver, "receiveMessage");
        adapter.setSerializer(jackson2JsonRedisSerializer);
        return adapter;
    }

    /** Message listener container */
    @Bean
    RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter messageListenerAdapter) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.addMessageListener(messageListenerAdapter, new PatternTopic(topicName));
        return container;
    }
}

Corresponding Spring configuration (application.yml) example:

spring:
  ...
  # redis
  redis:
    cluster:
      nodes: namenode22:6379,datanode23:6379,datanode24:6379
      max-redirects: 6
    timeout: 300000
    jedis:
      pool:
        max-active: 8
        max-wait: 100000
        max-idle: 8
        min-idle: 0
    # custom topic path
    message:
      topic-name: topic-test

3. Define a Redis message handler

@Component
public class MessageReceiver {
    private final Logger logger = LoggerFactory.getLogger(getClass());

    @Autowired
    private SimpMessagingTemplate messagingTemplate;

    @Autowired
    private SimpUserRegistry userRegistry;

    /** Process WebSocket messages */
    public void receiveMessage(RedisWebsocketMsg redisWebsocketMsg) {
        logger.info(MessageFormat.format("Received Message: {0}", redisWebsocketMsg));
        // 1. Get the user and check if the connection exists on this node
        SimpUser simpUser = userRegistry.getUser(redisWebsocketMsg.getReceiver());
        if (simpUser != null && StringUtils.isNoneBlank(simpUser.getName())) {
            // 2. Get the subscription URL of the client
            WebSocketChannelEnum channelEnum = WebSocketChannelEnum.fromCode(redisWebsocketMsg.getChannelCode());
            if (channelEnum != null) {
                // 3. Send the message to the client
                messagingTemplate.convertAndSendToUser(redisWebsocketMsg.getReceiver(), channelEnum.getSubscribeUrl(), redisWebsocketMsg.getContent());
            }
        }
    }
}

4. Send WebSocket messages from a Controller

@Controller
@RequestMapping("/wsTemplate")
public class RedisMessageController {
    private final Logger logger = LoggerFactory.getLogger(getClass());

    @Value("${spring.redis.message.topic-name}")
    private String topicName;

    @Autowired
    private SimpMessagingTemplate messagingTemplate;

    @Autowired
    private SimpUserRegistry userRegistry;

    @Resource(name = "redisServiceImpl")
    private RedisService redisService;

    /** Send a message to a specific user */
    @PostMapping("/sendToUser")
    @ResponseBody
    public String chat(HttpServletRequest request) {
        String receiver = request.getParameter("receiver");
        String msg = request.getParameter("msg");
        HttpSession session = SpringContextUtils.getSession();
        User loginUser = (User) session.getAttribute(Constants.SESSION_USER);
        HelloMessage resultData = new HelloMessage(MessageFormat.format("{0} say: {1}", loginUser.getUsername(), msg));
        this.sendToUser(loginUser.getUsername(), receiver, WebSocketChannelEnum.CHAT.getSubscribeUrl(), JsonUtils.toJson(resultData));
        return "ok";
    }

    /** Send a message and handle the case where the receiver is offline */
    private void sendToUser(String sender, String receiver, String destination, String payload) {
        SimpUser simpUser = userRegistry.getUser(receiver);
        if (simpUser != null && StringUtils.isNoneBlank(simpUser.getName())) {
            messagingTemplate.convertAndSendToUser(receiver, destination, payload);
        } else if (redisService.isSetMember(Constants.REDIS_WEBSOCKET_USER_SET, receiver)) {
            RedisWebsocketMsg
redisWebsocketMsg = new RedisWebsocketMsg<>(receiver, WebSocketChannelEnum.CHAT.getCode(), payload);
            redisService.convertAndSend(topicName, redisWebsocketMsg);
        } else {
            String listKey = Constants.REDIS_UNREAD_MSG_PREFIX + receiver + ":" + destination;
            logger.info(MessageFormat.format("Receiver {0} has not established WebSocket, message from {1} will be stored in Redis list {2}", receiver, sender, listKey));
            redisService.addToListRight(listKey, ExpireEnum.UNREAD_MSG, payload);
        }
    }

    /** Pull unread messages for a specific destination */
    @PostMapping("/pullUnreadMessage")
    @ResponseBody
    public Map
pullUnreadMessage(String destination) {
        Map
result = new HashMap<>();
        try {
            HttpSession session = SpringContextUtils.getSession();
            User loginUser = (User) session.getAttribute(Constants.SESSION_USER);
            String listKey = Constants.REDIS_UNREAD_MSG_PREFIX + loginUser.getUsername() + ":" + destination;
            List
messageList = redisService.rangeList(listKey, 0, -1);
            result.put("code", "200");
            if (messageList != null && !messageList.isEmpty()) {
                redisService.delete(listKey);
                result.put("result", messageList);
            }
        } catch (Exception e) {
            result.put("code", "500");
            result.put("msg", e.getMessage());
        }
        return result;
    }
}

5. WebSocket related configuration

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    @Autowired
    private AuthHandshakeInterceptor authHandshakeInterceptor;

    @Autowired
    private MyHandshakeHandler myHandshakeHandler;

    @Autowired
    private MyChannelInterceptor myChannelInterceptor;

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/chat-websocket")
                .addInterceptors(authHandshakeInterceptor)
                .setHandshakeHandler(myHandshakeHandler)
                .withSockJS();
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        // Client sends messages to /message/xxx
        registry.setApplicationDestinationPrefixes("/message");
        // Server broadcasts to /topic/yyy
        registry.enableSimpleBroker("/topic");
        // Prefix for messages to a specific user
        registry.setUserDestinationPrefix("/user/");
    }

    @Override
    public void configureClientInboundChannel(ChannelRegistration registration) {
        registration.interceptors(myChannelInterceptor);
    }
}

6. Sample HTML page

<head>
    <meta charset="UTF-8"/>
    <title>Chat With STOMP Message</title>
    <script src="https://cdnjs.cloudflare.com/ajax/libs/jquery/3.3.1/jquery.min.js"></script>
    <script src="https://cdnjs.cloudflare.com/ajax/libs/sockjs-client/1.1.4/sockjs.min.js"></script>
    <script src="https://cdnjs.cloudflare.com/ajax/libs/stomp.js/2.3.3/stomp.min.js"></script>
    ... (CSS and JS omitted for brevity) ...
    <script type="text/javascript">
        var stompClient = null;
        // UI logic for connect / disconnect / send message, pull unread messages, etc.
        // (The original JavaScript code is kept unchanged.)
    </script>
</head>
<body>
    ... (HTML form for target URL, receiver, message, and buttons) ...
</body>

Finally, the author adds a friendly reminder to support the work by following the public account and replying with keywords to obtain related PDF collections (Spring Cloud, Spring Boot, MyBatis). Likes, shares, and collections are also encouraged.

backendRedisSpringKafkaWebSocketMessage Queuedistributed
Code Ape Tech Column
Written by

Code Ape Tech Column

Former Ant Group P8 engineer, pure technologist, sharing full‑stack Java, job interview and career advice through a column. Site: java-family.cn

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.