Implementing a Lightweight Redis‑Based Message Queue: Environment Setup, List, Pub/Sub, and Stream Solutions
This article details the research background, environment preparation, and three Redis‑based message‑queue implementations—using List structures, Pub/Sub, and Stream APIs—along with code examples, design considerations, testing strategies, and deployment tips for building a lightweight messaging component in Java Spring Boot.
Technical Research Background
Because the startup team lacks a mature operation‑and‑maintenance system for traditional MQs, a lightweight messaging solution is needed; after research, a Redis‑based message queue was chosen due to existing Redis infrastructure, abundant community articles, and low system throughput requirements.
Basic Environment Setup
Deploy a Redis 6.0.6 container:
docker run -p 6379:6379 --name redis_6_0_6 -d redis:6.0.6If the image is missing, pull it first:
docker pull redis:6.0.6After the Redis instance is ready, three technical schemes are introduced to build a lightweight message queue.
Implementation Based on List Structure
The simplest approach uses Redis List: producers push messages to the left, consumers pop from the right.
package org.idea.mq.redis.framework.mq.list;
import com.alibaba.fastjson.JSON;
import org.idea.mq.redis.framework.bean.MsgWrapper;
import org.idea.mq.redis.framework.mq.IMQTemplate;
import org.idea.mq.redis.framework.redis.IRedisService;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
@Component
public class RedisListMQTemplate implements IMQTemplate {
@Resource
private IRedisService iRedisService;
@Override
public boolean send(MsgWrapper msgWrapper) {
try {
String json = JSON.toJSONString(msgWrapper.getMsgInfo());
iRedisService.lpush(msgWrapper.getTopic(), json);
return true;
} catch (Exception e) {
e.printStackTrace();
}
return false;
}
}Problem Considerations
How multiple services subscribe to the same message? Use a naming convention like user-service:member-upgrade-list for the List key.
Message listening mechanism: polling with rpop or blocking pop brpop to reduce CPU usage.
Reliability: use BRPOPLPUSH to keep a backup copy in case of consumer failure.
Broadcast support: List cannot broadcast because an element is delivered to a single client.
Implementation Based on Pub/Sub
Redis Pub/Sub provides broadcast‑style messaging via publish and subscribe commands.
@Override
public boolean publish(String channel, String content) {
try (Jedis jedis = iRedisFactory.getConnection()) {
jedis.publish(channel, content);
return true;
} catch (Exception e) {
throw new RuntimeException(e);
}
} @Override
public boolean subscribe(JedisPubSub jedisPubSub, String... channel) {
try (Jedis jedis = iRedisFactory.getConnection()) {
jedis.subscribe(jedisPubSub, channel);
return true;
} catch (Exception e) {
throw new RuntimeException(e);
}
}A dedicated thread can run the subscription listener.
Problem Considerations
Reliability: Pub/Sub does not persist messages; a network break or Redis crash leads to loss.
Implementation Based on Redis Stream
Redis 5.0 introduced Streams, offering persistence, consumer groups, and reliable delivery.
Message ID generation
Message traversal
Blocking and non‑blocking reads
Consumer‑group consumption
Pending‑message handling
Queue monitoring
Define a listener interface:
public interface RedisStreamListener {
HandlerResult handleMsg(StreamEntry streamEntry);
}Example listener implementation:
@StreamListener(streamName = "order-service:order-payed-stream", groupName = "order-service-group", consumerName = "user-service-consumer")
public class OrderPayedListener implements RedisStreamMQListener {
@Resource
private IRedisService iRedisService;
@Override
public HandlerResult handleMsg(StreamEntry streamEntry) {
Map
map = streamEntry.getFields();
String json = map.get("json");
PayMsg payMsg = JSON.parseObject(json, PayMsg.class);
System.out.println("pending payMsg is : " + payMsg);
return SUCCESS;
}
}Custom annotation @StreamListener registers the listener as a Spring component.
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Component
public @interface StreamListener {
String streamName() default "";
String groupName() default "";
String consumerName() default "";
}Configuration starts two background threads after the Spring context is ready: one for normal stream consumption, another for pending‑message handling.
public class StreamListenerConfiguration implements ApplicationListener
{
@Resource
private ApplicationContext applicationContext;
@Resource
private IRedisService iRedisService;
@Override
public void onApplicationEvent(ApplicationReadyEvent event) {
Map
beanMap = applicationContext.getBeansOfType(RedisStreamMQListener.class);
beanMap.values().forEach(listener -> {
StreamListener sl = listener.getClass().getAnnotation(StreamListener.class);
ListenerInitWrapper wrapper = new ListenerInitWrapper(sl.streamName(), sl.groupName(), sl.consumerName());
new Thread(new CoreMsgHandlerThread(wrapper, listener, iRedisService)).start();
new Thread(new PendingMsgHandlerThread(wrapper, listener, iRedisService)).start();
});
}
// CoreMsgHandlerThread and PendingMsgHandlerThread implementations omitted for brevity
}Message publishing is a thin wrapper that writes JSON to a stream using Redis' auto‑generated ID.
public class StreamProducer implements IStreamProducer {
@Resource
private IRedisService iRedisService;
@Override
public void sendMsg(String streamName, String json) {
Map
map = new HashMap<>();
map.put("json", json);
iRedisService.xAdd(streamName, map);
}
}Component Testing
Two micro‑service projects (user‑service and order‑service) each deploy multiple instances belonging to distinct consumer groups. Point‑to‑point tests verify that within a group only one instance receives a message, while broadcast tests confirm that all instances with different group names receive the same stream message.
Further Considerations
Why two threads for a single Stream? One handles normal consumption with ACK; the other processes pending messages that failed to ACK, preventing message loss.
Delay‑retry support is not yet implemented; current design may cause endless retries for failing messages.
Full source code is available at https://gitee.com/IdeaHome_admin/mq-framework .
Selected Java Interview Questions
A professional Java tech channel sharing common knowledge to help developers fill gaps. Follow us!
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.