Flexible Multi-MQ Component Update: Switch Between RocketMQ, Redis, Kafka, and RabbitMQ
The article explains why adopting a message‑queue improves microservice decoupling and responsibility boundaries, then details the design of a pluggable Base‑MQ component—including @MQEventListener, MQEvent base class, configuration, and MQClient interface—followed by concrete implementations for RocketMQ, Kafka, Redis and RabbitMQ, an example event flow, and guidance for extending the framework.
Motivation and Design Choice
Team members raised typical concerns about adopting a message‑queue (MQ): resource consumption, operational cost, synchronous call semantics, and distributed‑transaction handling. The solution chosen balances these concerns by using Redis publish/subscribe as the default implementation while keeping the architecture open to RocketMQ, Kafka, and RabbitMQ.
Core Abstractions
@MQEventListener – a lightweight method‑level annotation that specifies group, topic and tags. It is independent of any concrete MQ implementation.
MQEvent – an abstract base class for all events. It defines fields msgId, topic, tag, tenantId and a set of overloaded publish() methods that delegate to a publisher bean stored in BaseContext.
public abstract class MQEvent implements Serializable {
protected String msgId;
protected String topic;
protected String tag;
protected String tenantId;
public void publish() { this.publish(getTopic(), getTag(), getTenantId()); }
public void publish(String topic) { this.publish(topic, getTag(), getTenantId()); }
public void publish(String topic, String tag) { this.publish(topic, tag, getTenantId()); }
public void publish(String topic, String tag, String tenantId) {
setTopic(topic);
if (getTopic() == null) {
setTopic(SpringContext.getEnv().getProperty("base-mq.default-topic", "DEFAULT"));
}
setTag(tag);
setTenantId(tenantId != null ? tenantId : ThreadContext.get(ContextConstants.TENANT_ID));
setMsgId(getMsgId() == null ? String.valueOf(System.currentTimeMillis()) : getMsgId());
if (BaseContext.contains("MQEventPublisher")) {
BaseContext.<String, Consumer<MQEvent>>get("MQEventPublisher").accept(this);
}
}
public <T extends MQEvent> T tenantId(String tenantId) { this.tenantId = tenantId; return (T) this; }
}BaseMQProperties – configuration holder used by all implementations.
public class BaseMQProperties {
private boolean enable = false;
private String impl = "none"; // kafka|rocket|redis|rabbit
private String server = "";
private String namespace = "";
private boolean persist = false;
private String serialization = "JsonMQEventSerialization";
private boolean autoAck = false;
private int retries = 0;
private String username = "";
private String password = "";
private String producerGroup = "DEFAULT";
private String defaultTopic = "DEFAULT";
private String exchange = "";
// getters & setters omitted for brevity
}MQClient – pluggable interface that each concrete MQ implementation must satisfy.
public interface MQClient {
String impl();
default BaseMQProperties config() { return SpringContext.getBean(BaseMQProperties.class); }
default void init() {
if (!config().isEnable() || !Objects.equals(config().getImpl(), impl())) return;
Logger log = LoggerFactory.getLogger("### BASE-MQ : " + impl() + "Client ###");
if (Objects.equals(config().getImpl(), impl()) && !BaseContext.contains("MQEventPublisher")) {
Consumer<MQEvent> producer = initProducer();
if (producer != null) BaseContext.inject("MQEventPublisher", producer);
}
// discover @MQEventListener methods and build MQListener objects (omitted for brevity)
// initConsumer for each listener (omitted for brevity)
}
Consumer<MQEvent> initProducer();
void initConsumer(MQListener listener) throws Exception;
void start();
default MQEventSerialization serialization() { return SpringContext.getBean(config().getSerialization()); }
default void consume(MQListener listener, MQEvent event) {
Logger log = LoggerFactory.getLogger("### BASE-MQ : " + impl() + "Client ###");
log.info("Consume MQ: {}", serialization().serialize(event));
if (config().isPersist()) {
try { SpringContext.getBean(MQEventStorer.class).store(event.getTopic(), event); }
catch (Exception e) { log.error("Persist MQ failed: {}", serialization().serialize(event), e); }
}
listener.getConsumer().accept(event);
}
}Example Event Flow
A ShopSyncEvent extends MQEvent to represent a shop‑creation notification. The producer (mall service) builds the event and calls publish(). Two consumer services (IOT and leasing) annotate a handler method with @MQEventListener(topic="mall", tags="shop_sync") to receive the event.
@Data @Builder @AllArgsConstructor @NoArgsConstructor
public class ShopSyncEvent extends MQEvent {
private String shopId;
private String shopName;
private JSONObject shopInfo;
@Override public String getTopic() { return "mall"; }
@Override public String getTag() { return "shop_sync"; }
}
// Producer
ShopSyncEvent.builder().shopId(id).shopName(name).shopInfo(new JSONObject(info))
.tenantId(tenantId).publish();
// Consumer
@MQEventListener(topic = "mall", tags = "shop_sync")
public void onShopSync(ShopSyncEvent event) {
log.info("Shop sync to leasing/IOT: {}", event.getShopInfo());
// business logic
}Concrete MQClient Implementations
RocketMQ
public final class RocketClient implements MQClient {
private static Map<String, Supplier<DefaultMQPushConsumer>> LISTENERS = new ConcurrentHashMap<>();
@Override public String impl() { return "rocket"; }
@Override public Consumer<MQEvent> initProducer() {
try {
DefaultMQProducer producer;
if (!config().getUsername().isEmpty() && !config().getPassword().isEmpty()) {
producer = new DefaultMQProducer(config().getNamespace(), config().getProducerGroup(),
new AclClientRPCHook(new SessionCredentials(config().getUsername(), config().getPassword())));
} else {
producer = new DefaultMQProducer(config().getNamespace(), config().getProducerGroup());
}
producer.setNamesrvAddr(config().getServer());
producer.start();
return ev -> {
String msg = serialization().serialize(ev);
Message m = new Message(ev.getTopic(), ev.getTag() == null ? "" : ev.getTag(), msg.getBytes(RemotingHelper.DEFAULT_CHARSET));
try { producer.send(m); } catch (Exception e) { log.error("Publish MQ failed", e); }
};
} catch (MQClientException e) { log.error("Create Rocket producer failed", e); }
return null;
}
@Override public void initConsumer(MQListener l) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(config().getNamespace(), l.getGroup());
consumer.setNamesrvAddr(config().getServer());
consumer.subscribe(l.getTopic(), l.getTags());
consumer.registerMessageListener((msgs, ctx) -> {
for (MessageExt me : msgs) {
MQEvent ev = serialization().deserialize(new String(me.getBody(), StandardCharsets.UTF_8), l.getEventClass());
consume(l, ev);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
LISTENERS.put(l.getTopic(), () -> consumer);
}
@Override public void start() { LISTENERS.values().forEach(s -> s.get().start()); }
}Kafka
public final class KafkaClient implements MQClient {
private static List<Supplier<KafkaConsumer<String,String>>> LISTENERS = new ArrayList<>();
@Override public String impl() { return "kafka"; }
@Override public Consumer<MQEvent> initProducer() {
Properties p = new Properties();
p.put("bootstrap.servers", config().getServer());
p.put("acks", "all");
p.put("retries", config().getRetries());
p.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
p.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String,String> prod = new KafkaProducer<>(p);
return ev -> {
String msg = serialization().serialize(ev);
prod.send(new ProducerRecord<>(config().namespace("_") + ev.getTopic(), msg));
};
}
@Override public void initConsumer(MQListener l) throws Exception {
Properties p = new Properties();
p.put("bootstrap.servers", config().getServer());
p.put("group.id", l.getGroup());
p.put("enable.auto.commit", config().isAutoAck());
p.put("auto.offset.reset", "earliest");
p.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
p.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String,String> consumer = new KafkaConsumer<>(p);
consumer.subscribe(Collections.singletonList(config().namespace("_") + l.getTopic()));
LISTENERS.add(() -> {
GlobalThreadPool.submit(() -> {
while (true) {
ConsumerRecords<String,String> recs = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String,String> r : recs) {
MQEvent ev = serialization().deserialize(r.value(), l.getEventClass());
if (!MQFilter.matchExps(ev.getTag(), l.getTags())) continue;
try { consume(l, ev); if (!config().isAutoAck()) consumer.commitSync(); }
catch (Exception e) { log.error("Consume MQ failed", ev, e); }
}
}
});
return consumer;
});
}
@Override public void start() { LISTENERS.forEach(Supplier::get); }
@Override public void destroy() throws Exception { LISTENERS.forEach(s -> s.get().close()); }
}Redis
public final class RedisClient implements MQClient {
private static JedisPool JEDIS_POOL;
private static List<Supplier<Void>> LISTENERS = new ArrayList<>();
@Override public String impl() { return "redis"; }
@Override public Consumer<MQEvent> initProducer() {
return ev -> {
String msg = serialization().serialize(ev);
String channel = String.format("%s%s%s",
config().getNamespace().isEmpty() ? "" : config().getNamespace() + ":",
ev.getTopic(),
(ev.getTag() == null || ev.getTag().isEmpty()) ? "" : ":" + ev.getTag());
GlobalThreadPool.submit(() -> {
try (Jedis j = jedis()) { j.publish(channel, msg); }
catch (Throwable t) { log.error("Publish MQ to [{}] failed", channel, t); }
});
};
}
@Override public void initConsumer(MQListener l) throws Exception {
List<String> channels = new ArrayList<>();
if (l.getTags() != null && !l.getTags().isEmpty()) {
for (String tag : MQFilter.findIncludes(l.getTags())) {
channels.add(config().getNamespace() + ":" + l.getTopic() + ":" + tag);
}
} else {
channels.add(config().getNamespace() + ":" + l.getTopic());
}
String lockScript = "if redis.call('get', KEYS[1]) == false then redis.call('setex', KEYS[1], tonumber(ARGV[1]), KEYS[1]) return 1 else return 0 end";
LISTENERS.add(() -> {
GlobalThreadPool.submit(() -> {
try (Jedis j = jedis()) {
j.subscribe(new JedisPubSub() {
@Override public void onMessage(String channel, String message) {
try {
MQEvent ev = serialization().deserialize(message, l.getEventClass());
String lockKey = String.format("ChannelGroupLock:%s:%s:%s", channel, l.getGroup(), ev.getMsgId());
if ((long) j.eval(lockScript, 1, lockKey, "10000") == 1) {
consume(l, ev);
}
} catch (Exception e) { log.error("Consume MQ failed", e); }
}
}, channels.toArray(new String[0]));
}
});
return null;
});
}
@Override public void start() { LISTENERS.forEach(Supplier::get); }
@Override public void destroy() throws Exception { if (JEDIS_POOL != null) JEDIS_POOL.close(); }
private Jedis jedis() {
if (JEDIS_POOL == null) {
String[] hostPort = config().getServer().split(":");
JedisPoolConfig pc = new JedisPoolConfig();
pc.setMaxTotal(100);
pc.setMaxIdle(50);
pc.setMinIdle(10);
pc.setTestOnBorrow(true);
if (!config().getPassword().isEmpty()) {
JEDIS_POOL = new JedisPool(pc, hostPort[0], Integer.parseInt(hostPort[1]), 5000, config().getPassword());
} else {
JEDIS_POOL = new JedisPool(pc, hostPort[0], Integer.parseInt(hostPort[1]));
}
}
return JEDIS_POOL.getResource();
}
}RabbitMQ
public final class RabbitClient implements MQClient {
private static Connection CONNECTION;
private static List<Supplier<Channel>> LISTENERS = new ArrayList<>();
@Override public String impl() { return "rabbit"; }
@Override public Consumer<MQEvent> initProducer() {
try {
Channel ch = connection().createChannel();
return ev -> {
String msg = serialization().serialize(ev);
String routingKey = config().namespace(".") + ev.getTopic() +
(ev.getTag() == null || ev.getTag().isEmpty() ? "" : "." + ev.getTag());
try { ch.basicPublish(config().getExchange(), routingKey, null, msg.getBytes()); }
catch (Exception e) { log.error("Publish MQ failed", e); }
};
} catch (IOException e) { throw new RuntimeException(e); }
}
@Override public void initConsumer(MQListener l) throws Exception {
String queue = l.getGroup() + "." + config().namespace(".") +
l.getListenerMethod().getDeclaringClass().getSimpleName() + "." + l.getListenerMethod().getName();
List<String> routingKeys = new ArrayList<>();
if (l.getTags() != null && !l.getTags().isEmpty()) {
for (String tag : MQFilter.findIncludes(l.getTags())) {
routingKeys.add(config().namespace(".") + l.getTopic() + "." + tag);
}
} else {
routingKeys.add(config().namespace(".") + l.getTopic());
}
try (Channel ch = connection().createChannel()) {
ch.queueDeclare(queue, true, false, false, null);
for (String rk : routingKeys) { ch.queueBind(queue, config().getExchange(), rk); }
DeliverCallback cb = (ctag, delivery) -> {
String msg = new String(delivery.getBody(), StandardCharsets.UTF_8);
MQEvent ev = serialization().deserialize(msg, l.getEventClass());
if (!MQFilter.matchExps(ev.getTag(), l.getTags())) return;
try {
consume(l, ev);
if (!config().isAutoAck()) ch.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
} catch (Exception e) { log.error("Consume MQ failed", ev, e); }
};
ch.basicConsume(queue, config().isAutoAck(), cb, consumerTag -> {});
LISTENERS.add(() -> ch);
}
}
@Override public void start() { LISTENERS.forEach(Supplier::get); }
@Override public void destroy() throws Exception {
for (Supplier<Channel> s : LISTENERS) { s.get().close(); }
if (CONNECTION != null) CONNECTION.close();
}
private Connection connection() {
if (CONNECTION == null) {
ConnectionFactory factory = new ConnectionFactory();
String[] hp = config().getServer().split(":");
factory.setHost(hp[0]);
factory.setPort(Integer.parseInt(hp[1]));
factory.setUsername(config().getUsername());
factory.setPassword(config().getPassword());
try { CONNECTION = factory.newConnection(); } catch (IOException | TimeoutException e) { throw new RuntimeException(e); }
}
return CONNECTION;
}
}Overall Package Structure
config – contains BaseMQProperties and the auto‑configuration class that collects all MQClient beans, calls init() and start().
core – defines the core abstractions ( MQClient, MQEvent, @MQEventListener, MQListener, serialization, storage, filtering).
impl – provides the JSON serializer and the four concrete client implementations (Rocket, Kafka, Redis, Rabbit).
The component can be switched between implementations by setting base-mq.impl (e.g., rocket, kafka, redis, rabbit) and providing the corresponding connection properties. Adding a new MQ simply requires implementing MQClient and registering the bean.
Architect's Journey
E‑commerce, SaaS, AI architect; DDD enthusiast; SKILL enthusiast
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
