Backend Development 17 min read

Implementing a Lightweight Redis‑Based Message Queue: Environment Setup, List, Pub/Sub, and Stream Solutions

This article details the research background, environment preparation, and three Redis‑based message‑queue implementations—using List structures, Pub/Sub, and Stream APIs—along with code examples, design considerations, testing strategies, and deployment tips for building a lightweight messaging component in Java Spring Boot.

Selected Java Interview Questions
Selected Java Interview Questions
Selected Java Interview Questions
Implementing a Lightweight Redis‑Based Message Queue: Environment Setup, List, Pub/Sub, and Stream Solutions

Technical Research Background

Because the startup team lacks a mature operation‑and‑maintenance system for traditional MQs, a lightweight messaging solution is needed; after research, a Redis‑based message queue was chosen due to existing Redis infrastructure, abundant community articles, and low system throughput requirements.

Basic Environment Setup

Deploy a Redis 6.0.6 container:

docker run -p 6379:6379 --name redis_6_0_6 -d redis:6.0.6

If the image is missing, pull it first:

docker pull redis:6.0.6

After the Redis instance is ready, three technical schemes are introduced to build a lightweight message queue.

Implementation Based on List Structure

The simplest approach uses Redis List: producers push messages to the left, consumers pop from the right.

package org.idea.mq.redis.framework.mq.list;
import com.alibaba.fastjson.JSON;
import org.idea.mq.redis.framework.bean.MsgWrapper;
import org.idea.mq.redis.framework.mq.IMQTemplate;
import org.idea.mq.redis.framework.redis.IRedisService;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;

@Component
public class RedisListMQTemplate implements IMQTemplate {
    @Resource
    private IRedisService iRedisService;
    @Override
    public boolean send(MsgWrapper msgWrapper) {
        try {
            String json = JSON.toJSONString(msgWrapper.getMsgInfo());
            iRedisService.lpush(msgWrapper.getTopic(), json);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return false;
    }
}

Problem Considerations

How multiple services subscribe to the same message? Use a naming convention like user-service:member-upgrade-list for the List key.

Message listening mechanism: polling with rpop or blocking pop brpop to reduce CPU usage.

Reliability: use BRPOPLPUSH to keep a backup copy in case of consumer failure.

Broadcast support: List cannot broadcast because an element is delivered to a single client.

Implementation Based on Pub/Sub

Redis Pub/Sub provides broadcast‑style messaging via publish and subscribe commands.

@Override
public boolean publish(String channel, String content) {
    try (Jedis jedis = iRedisFactory.getConnection()) {
        jedis.publish(channel, content);
        return true;
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
}
@Override
public boolean subscribe(JedisPubSub jedisPubSub, String... channel) {
    try (Jedis jedis = iRedisFactory.getConnection()) {
        jedis.subscribe(jedisPubSub, channel);
        return true;
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
}

A dedicated thread can run the subscription listener.

Problem Considerations

Reliability: Pub/Sub does not persist messages; a network break or Redis crash leads to loss.

Implementation Based on Redis Stream

Redis 5.0 introduced Streams, offering persistence, consumer groups, and reliable delivery.

Message ID generation

Message traversal

Blocking and non‑blocking reads

Consumer‑group consumption

Pending‑message handling

Queue monitoring

Define a listener interface:

public interface RedisStreamListener {
    HandlerResult handleMsg(StreamEntry streamEntry);
}

Example listener implementation:

@StreamListener(streamName = "order-service:order-payed-stream", groupName = "order-service-group", consumerName = "user-service-consumer")
public class OrderPayedListener implements RedisStreamMQListener {
    @Resource
    private IRedisService iRedisService;
    @Override
    public HandlerResult handleMsg(StreamEntry streamEntry) {
        Map
map = streamEntry.getFields();
        String json = map.get("json");
        PayMsg payMsg = JSON.parseObject(json, PayMsg.class);
        System.out.println("pending payMsg is : " + payMsg);
        return SUCCESS;
    }
}

Custom annotation @StreamListener registers the listener as a Spring component.

@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Component
public @interface StreamListener {
    String streamName() default "";
    String groupName() default "";
    String consumerName() default "";
}

Configuration starts two background threads after the Spring context is ready: one for normal stream consumption, another for pending‑message handling.

public class StreamListenerConfiguration implements ApplicationListener
{
    @Resource
    private ApplicationContext applicationContext;
    @Resource
    private IRedisService iRedisService;
    @Override
    public void onApplicationEvent(ApplicationReadyEvent event) {
        Map
beanMap = applicationContext.getBeansOfType(RedisStreamMQListener.class);
        beanMap.values().forEach(listener -> {
            StreamListener sl = listener.getClass().getAnnotation(StreamListener.class);
            ListenerInitWrapper wrapper = new ListenerInitWrapper(sl.streamName(), sl.groupName(), sl.consumerName());
            new Thread(new CoreMsgHandlerThread(wrapper, listener, iRedisService)).start();
            new Thread(new PendingMsgHandlerThread(wrapper, listener, iRedisService)).start();
        });
    }
    // CoreMsgHandlerThread and PendingMsgHandlerThread implementations omitted for brevity
}

Message publishing is a thin wrapper that writes JSON to a stream using Redis' auto‑generated ID.

public class StreamProducer implements IStreamProducer {
    @Resource
    private IRedisService iRedisService;
    @Override
    public void sendMsg(String streamName, String json) {
        Map
map = new HashMap<>();
        map.put("json", json);
        iRedisService.xAdd(streamName, map);
    }
}

Component Testing

Two micro‑service projects (user‑service and order‑service) each deploy multiple instances belonging to distinct consumer groups. Point‑to‑point tests verify that within a group only one instance receives a message, while broadcast tests confirm that all instances with different group names receive the same stream message.

Further Considerations

Why two threads for a single Stream? One handles normal consumption with ACK; the other processes pending messages that failed to ACK, preventing message loss.

Delay‑retry support is not yet implemented; current design may cause endless retries for failing messages.

Full source code is available at https://gitee.com/IdeaHome_admin/mq-framework .

backendJavaRedisSpring BootMessage QueueStreamPub/Sub
Selected Java Interview Questions
Written by

Selected Java Interview Questions

A professional Java tech channel sharing common knowledge to help developers fill gaps. Follow us!

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.