How to Build Distributed WebSocket Messaging with Spring, Redis, and Kafka

This article explains how to enable cross‑node WebSocket communication in a distributed Spring application by using a message queue (Redis or Kafka) to broadcast messages, tracking user connections with Redis, and providing a complete step‑by‑step implementation with code samples and configuration details.

Architect
Architect
Architect
How to Build Distributed WebSocket Messaging with Spring, Redis, and Kafka

Problem Statement

In a distributed environment where Nginx load‑balances requests across multiple application servers, a user who establishes a WebSocket connection on one server cannot directly send messages to a user connected on another server. The challenge is to deliver WebSocket messages across server nodes.

Solution Approaches

Publish each message (containing userId and content ) to a shared message‑queue topic (e.g., Redis or Kafka ). Every application node subscribes to the topic, extracts the receiver’s user ID, checks whether the corresponding WebSocket session exists locally, and forwards the message if the user is connected on that node. If the user is not present, the message is discarded because the node that owns the session will handle it.

After a user establishes a WebSocket connection, record the node identifier in a Redis cache. When sending a message, push it to the queue and let the target node (identified via the cache) consume the message. This approach reduces network traffic but adds complexity.

Implementation (Approach 1)

1. Define a WebSocket channel enum

public enum WebSocketChannelEnum {
    // Simple point‑to‑point chat for demonstration
    CHAT("CHAT", "Simple chat", "/topic/reply");

    private String code;
    private String description;
    private String subscribeUrl;

    WebSocketChannelEnum(String code, String description, String subscribeUrl) {
        this.code = code;
        this.description = description;
        this.subscribeUrl = subscribeUrl;
    }

    public String getCode() { return code; }
    public String getDescription() { return description; }
    public String getSubscribeUrl() { return subscribeUrl; }

    /** Find enum by code */
    public static WebSocketChannelEnum fromCode(String code) {
        if (StringUtils.isNoneBlank(code)) {
            for (WebSocketChannelEnum e : values()) {
                if (e.code.equals(code)) {
                    return e;
                }
            }
        }
        return null;
    }
}

2. Configure a Redis‑based message queue

For production‑grade projects, Redis is not recommended for reliable messaging; Kafka or RabbitMQ should be used instead.

@Configuration
@ConditionalOnClass({JedisCluster.class})
public class RedisConfig {
    @Value("${spring.redis.timeout}")
    private String timeOut;
    @Value("${spring.redis.cluster.nodes}")
    private String nodes;
    @Value("${spring.redis.cluster.max-redirects}")
    private int maxRedirects;
    @Value("${spring.redis.jedis.pool.max-active}")
    private int maxActive;
    @Value("${spring.redis.jedis.pool.max-wait}")
    private int maxWait;
    @Value("${spring.redis.jedis.pool.max-idle}")
    private int maxIdle;
    @Value("${spring.redis.jedis.pool.min-idle}")
    private int minIdle;
    @Value("${spring.redis.message.topic-name}")
    private String topicName;

    @Bean
    public JedisPoolConfig jedisPoolConfig() {
        JedisPoolConfig config = new JedisPoolConfig();
        config.setMaxTotal(maxActive);
        config.setMaxIdle(maxIdle);
        config.setMinIdle(minIdle);
        config.setMaxWaitMillis(maxWait);
        return config;
    }

    @Bean
    public RedisClusterConfiguration redisClusterConfiguration() {
        RedisClusterConfiguration configuration = new RedisClusterConfiguration(Arrays.asList(nodes));
        configuration.setMaxRedirects(maxRedirects);
        return configuration;
    }

    /** JedisConnectionFactory */
    @Bean
    public JedisConnectionFactory jedisConnectionFactory(RedisClusterConfiguration configuration, JedisPoolConfig jedisPoolConfig) {
        return new JedisConnectionFactory(configuration, jedisPoolConfig);
    }

    /** Jackson serializer for objects */
    @Bean
    public Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer() {
        Jackson2JsonRedisSerializer<Object> serializer = new Jackson2JsonRedisSerializer<>(Object.class);
        ObjectMapper mapper = new ObjectMapper();
        mapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        mapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        serializer.setObjectMapper(mapper);
        return serializer;
    }

    /** RedisTemplate */
    @Bean
    public RedisTemplate<String, Object> redisTemplate(JedisConnectionFactory factory, Jackson2JsonRedisSerializer<Object> serializer) {
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        template.setConnectionFactory(factory);
        StringRedisSerializer stringSerializer = new StringRedisSerializer();
        template.setKeySerializer(stringSerializer);
        template.setHashKeySerializer(stringSerializer);
        template.setValueSerializer(serializer);
        template.setHashValueSerializer(serializer);
        template.afterPropertiesSet();
        return template;
    }

    /** Message listener */
    @Bean
    public MessageListenerAdapter messageListenerAdapter(MessageReceiver receiver, Jackson2JsonRedisSerializer<Object> serializer) {
        MessageListenerAdapter adapter = new MessageListenerAdapter(receiver, "receiveMessage");
        adapter.setSerializer(serializer);
        return adapter;
    }

    /** Listener container */
    @Bean
    public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter adapter) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.addMessageListener(adapter, new PatternTopic(topicName));
        return container;
    }
}

Relevant application.yml fragment:

spring:
  redis:
    cluster:
      nodes: namenode22:6379,datanode23:6379,datanode24:6379
      max-redirects: 6
    timeout: 300000
    jedis:
      pool:
        max-active: 8
        max-wait: 100000
        max-idle: 8
        min-idle: 0
    message:
      topic-name: topic-test

3. Implement a Redis message handler

@Component
public class MessageReceiver {
    private final Logger logger = LoggerFactory.getLogger(getClass());

    @Autowired
    private SimpMessagingTemplate messagingTemplate;

    @Autowired
    private SimpUserRegistry userRegistry;

    /** Process incoming WebSocket messages */
    public void receiveMessage(RedisWebsocketMsg redisWebsocketMsg) {
        logger.info(MessageFormat.format("Received Message: {0}", redisWebsocketMsg));
        // 1. Retrieve the target user and check if the session exists on this node
        SimpUser simpUser = userRegistry.getUser(redisWebsocketMsg.getReceiver());
        if (simpUser != null && StringUtils.isNoneBlank(simpUser.getName())) {
            // 2. Resolve the channel enum
            WebSocketChannelEnum channelEnum = WebSocketChannelEnum.fromCode(redisWebsocketMsg.getChannelCode());
            if (channelEnum != null) {
                // 3. Send the payload to the client
                messagingTemplate.convertAndSendToUser(redisWebsocketMsg.getReceiver(), channelEnum.getSubscribeUrl(), redisWebsocketMsg.getContent());
            }
        }
    }
}

4. Controller for sending messages

@Controller
@RequestMapping("/wsTemplate")
public class RedisMessageController {
    private final Logger logger = LoggerFactory.getLogger(getClass());

    @Value("${spring.redis.message.topic-name}")
    private String topicName;

    @Autowired
    private SimpMessagingTemplate messagingTemplate;

    @Autowired
    private SimpUserRegistry userRegistry;

    @Resource(name = "redisServiceImpl")
    private RedisService redisService;

    /** Send a chat message to a specific user */
    @PostMapping("/sendToUser")
    @ResponseBody
    public String chat(HttpServletRequest request) {
        String receiver = request.getParameter("receiver");
        String msg = request.getParameter("msg");
        HttpSession session = SpringContextUtils.getSession();
        User loginUser = (User) session.getAttribute(Constants.SESSION_USER);
        HelloMessage payload = new HelloMessage(MessageFormat.format("{0} say: {1}", loginUser.getUsername(), msg));
        this.sendToUser(loginUser.getUsername(), receiver, WebSocketChannelEnum.CHAT.getSubscribeUrl(), JsonUtils.toJson(payload));
        return "ok";
    }

    /** Core sending logic handling online/offline cases */
    private void sendToUser(String sender, String receiver, String destination, String payload) {
        SimpUser simpUser = userRegistry.getUser(receiver);
        if (simpUser != null && StringUtils.isNoneBlank(simpUser.getName())) {
            // Receiver is connected to this node
            messagingTemplate.convertAndSendToUser(receiver, destination, payload);
        } else if (redisService.isSetMember(Constants.REDIS_WEBSOCKET_USER_SET, receiver)) {
            // Receiver is online on another node; push via Redis
            RedisWebsocketMsg<String> msg = new RedisWebsocketMsg<>(receiver, WebSocketChannelEnum.CHAT.getCode(), payload);
            redisService.convertAndSend(topicName, msg);
        } else {
            // Receiver offline; store for later retrieval
            String listKey = Constants.REDIS_UNREAD_MSG_PREFIX + receiver + ":" + destination;
            logger.info(MessageFormat.format("User {0} offline, storing message {1} to Redis list {2}", receiver, payload, listKey));
            redisService.addToListRight(listKey, ExpireEnum.UNREAD_MSG, payload);
        }
    }

    /** Pull unread messages for a specific destination */
    @PostMapping("/pullUnreadMessage")
    @ResponseBody
    public Map<String, Object> pullUnreadMessage(String destination) {
        Map<String, Object> result = new HashMap<>();
        try {
            HttpSession session = SpringContextUtils.getSession();
            User loginUser = (User) session.getAttribute(Constants.SESSION_USER);
            String listKey = Constants.REDIS_UNREAD_MSG_PREFIX + loginUser.getUsername() + ":" + destination;
            List<Object> messages = redisService.rangeList(listKey, 0, -1);
            result.put("code", "200");
            if (messages != null && !messages.isEmpty()) {
                redisService.delete(listKey);
                result.put("result", messages);
            }
        } catch (Exception e) {
            result.put("code", "500");
            result.put("msg", e.getMessage());
        }
        return result;
    }
}

5. WebSocket configuration

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
    @Autowired
    private AuthHandshakeInterceptor authHandshakeInterceptor;

    @Autowired
    private MyHandshakeHandler myHandshakeHandler;

    @Autowired
    private MyChannelInterceptor myChannelInterceptor;

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/chat-websocket")
                .addInterceptors(authHandshakeInterceptor)
                .setHandshakeHandler(myHandshakeHandler)
                .withSockJS();
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        // Client sends to /message/**
        registry.setApplicationDestinationPrefixes("/message");
        // Server broadcasts to /topic/**
        registry.enableSimpleBroker("/topic");
        // User‑specific destination prefix
        registry.setUserDestinationPrefix("/user/");
    }

    @Override
    public void configureClientInboundChannel(ChannelRegistration registration) {
        registration.interceptors(myChannelInterceptor);
    }
}

6. Front‑end demo page

<head>
    <meta charset="UTF-8"/>
    <title>Chat With STOMP Message</title>
    <script src="https://cdnjs.cloudflare.com/ajax/libs/jquery/3.3.1/jquery.min.js"></script>
    <script src="https://cdnjs.cloudflare.com/ajax/libs/sockjs-client/1.1.4/sockjs.min.js"></script>
    <script src="https://cdnjs.cloudflare.com/ajax/libs/stomp.js/2.3.3/stomp.min.js"></script>
    <style>
        #connect-container { margin: 0 auto; width: 400px; }
        #connect-container div { padding:5px; margin:0 7px 10px 0; }
        .message input { padding:5px; margin:0 7px 10px 0; }
        .layui-btn { display:inline-block; }
    </style>
    <script type="text/javascript">
        var stompClient = null;
        $(function(){
            var target = $("#target");
            if (window.location.protocol === 'http:') {
                target.val('http://' + window.location.host + target.val());
            } else {
                target.val('https://' + window.location.host + target.val());
            }
        });
        function setConnected(connected){
            var connect = $("#connect");
            var disconnect = $("#disconnect");
            var echo = $("#echo");
            if (connected){
                connect.addClass("layui-btn-disabled");
                disconnect.removeClass("layui-btn-disabled");
                echo.removeClass("layui-btn-disabled");
            } else {
                connect.removeClass("layui-btn-disabled");
                disconnect.addClass("layui-btn-disabled");
                echo.addClass("layui-btn-disabled");
            }
            connect.attr("disabled", connected);
            disconnect.attr("disabled", !connected);
            echo.attr("disabled", !connected);
        }
        function connect(){
            var target = $("#target").val();
            var ws = new SockJS(target);
            stompClient = Stomp.over(ws);
            stompClient.connect({}, function(){
                setConnected(true);
                console.log('Info: STOMP connection opened.');
                pullUnreadMessage("/topic/reply");
                stompClient.subscribe("/user/topic/reply", function(response){
                    console.log(JSON.parse(response.body).content);
                });
            }, function(){
                setConnected(false);
                console.log('Info: STOMP connection closed.');
            });
        }
        function disconnect(){
            if (stompClient != null){
                stompClient.disconnect();
                stompClient = null;
            }
            setConnected(false);
            console.log('Info: STOMP connection closed.');
        }
        function sendMessage(){
            if (stompClient != null){
                var receiver = $("#receiver").val();
                var msg = $("#message").val();
                console.log('Sent: ' + JSON.stringify({receiver:receiver, msg:msg}));
                $.ajax({
                    url: "/wsTemplate/sendToUser",
                    type: "POST",
                    dataType: "json",
                    async: true,
                    data: {receiver:receiver, msg:msg}
                });
            } else {
                alert('STOMP connection not established, please connect.');
            }
        }
        function pullUnreadMessage(destination){
            $.ajax({
                url: "/wsTemplate/pullUnreadMessage",
                type: "POST",
                dataType: "json",
                async: true,
                data: {destination:destination},
                success: function(data){
                    if (data.result != null){
                        data.result.forEach(function(item){
                            console.log(JSON.parse(item).content);
                        });
                    } else if (data.code != null && data.code == "500") {
                        alert(data.msg);
                    }
                }
            });
        }
    </script>
</head>
<body>
    <div id="connect-container" class="layui-elem-field">
        <legend>Chat With STOMP Message</legend>
        <div>
            <input id="target" type="text" class="layui-input" size="40" style="width:350px" value="/chat-websocket"/>
        </div>
        <div>
            <button id="connect" class="layui-btn layui-btn-normal" onclick="connect();">Connect</button>
            <button id="disconnect" class="layui-btn layui-btn-normal layui-btn-disabled" disabled onclick="disconnect();">Disconnect</button>
        </div>
        <div class="message">
            <input id="receiver" type="text" class="layui-input" size="40" style="width:350px" placeholder="Receiver name"/>
            <input id="message" type="text" class="layui-input" size="40" style="width:350px" placeholder="Message content"/>
        </div>
        <div>
            <button id="echo" class="layui-btn layui-btn-normal layui-btn-disabled" disabled onclick="sendMessage();">Send Message</button>
        </div>
    </div>
</body>

The article walks through the entire process: defining a channel enum, configuring a Redis (or Kafka) message broker, implementing a message receiver, exposing REST endpoints for sending and pulling messages, configuring Spring’s WebSocket broker, and providing a minimal HTML/JavaScript client that demonstrates connection, message sending, and offline‑message retrieval.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

Distributed SystemsJavaredisspringKafkaWebSocket
Architect
Written by

Architect

Professional architect sharing high‑quality architecture insights. Topics include high‑availability, high‑performance, high‑stability architectures, big data, machine learning, Java, system and distributed architecture, AI, and practical large‑scale architecture case studies. Open to ideas‑driven architects who enjoy sharing and learning.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.