Flexible Multi-MQ Component Update: Switch Between RocketMQ, Redis, Kafka, and RabbitMQ

The article explains why adopting a message‑queue improves microservice decoupling and responsibility boundaries, then details the design of a pluggable Base‑MQ component—including @MQEventListener, MQEvent base class, configuration, and MQClient interface—followed by concrete implementations for RocketMQ, Kafka, Redis and RabbitMQ, an example event flow, and guidance for extending the framework.

Architect's Journey
Architect's Journey
Architect's Journey
Flexible Multi-MQ Component Update: Switch Between RocketMQ, Redis, Kafka, and RabbitMQ

Motivation and Design Choice

Team members raised typical concerns about adopting a message‑queue (MQ): resource consumption, operational cost, synchronous call semantics, and distributed‑transaction handling. The solution chosen balances these concerns by using Redis publish/subscribe as the default implementation while keeping the architecture open to RocketMQ, Kafka, and RabbitMQ.

Core Abstractions

@MQEventListener – a lightweight method‑level annotation that specifies group, topic and tags. It is independent of any concrete MQ implementation.

MQEvent – an abstract base class for all events. It defines fields msgId, topic, tag, tenantId and a set of overloaded publish() methods that delegate to a publisher bean stored in BaseContext.

public abstract class MQEvent implements Serializable {
    protected String msgId;
    protected String topic;
    protected String tag;
    protected String tenantId;
    public void publish() { this.publish(getTopic(), getTag(), getTenantId()); }
    public void publish(String topic) { this.publish(topic, getTag(), getTenantId()); }
    public void publish(String topic, String tag) { this.publish(topic, tag, getTenantId()); }
    public void publish(String topic, String tag, String tenantId) {
        setTopic(topic);
        if (getTopic() == null) {
            setTopic(SpringContext.getEnv().getProperty("base-mq.default-topic", "DEFAULT"));
        }
        setTag(tag);
        setTenantId(tenantId != null ? tenantId : ThreadContext.get(ContextConstants.TENANT_ID));
        setMsgId(getMsgId() == null ? String.valueOf(System.currentTimeMillis()) : getMsgId());
        if (BaseContext.contains("MQEventPublisher")) {
            BaseContext.<String, Consumer<MQEvent>>get("MQEventPublisher").accept(this);
        }
    }
    public <T extends MQEvent> T tenantId(String tenantId) { this.tenantId = tenantId; return (T) this; }
}

BaseMQProperties – configuration holder used by all implementations.

public class BaseMQProperties {
    private boolean enable = false;
    private String impl = "none"; // kafka|rocket|redis|rabbit
    private String server = "";
    private String namespace = "";
    private boolean persist = false;
    private String serialization = "JsonMQEventSerialization";
    private boolean autoAck = false;
    private int retries = 0;
    private String username = "";
    private String password = "";
    private String producerGroup = "DEFAULT";
    private String defaultTopic = "DEFAULT";
    private String exchange = "";
    // getters & setters omitted for brevity
}

MQClient – pluggable interface that each concrete MQ implementation must satisfy.

public interface MQClient {
    String impl();
    default BaseMQProperties config() { return SpringContext.getBean(BaseMQProperties.class); }
    default void init() {
        if (!config().isEnable() || !Objects.equals(config().getImpl(), impl())) return;
        Logger log = LoggerFactory.getLogger("### BASE-MQ : " + impl() + "Client ###");
        if (Objects.equals(config().getImpl(), impl()) && !BaseContext.contains("MQEventPublisher")) {
            Consumer<MQEvent> producer = initProducer();
            if (producer != null) BaseContext.inject("MQEventPublisher", producer);
        }
        // discover @MQEventListener methods and build MQListener objects (omitted for brevity)
        // initConsumer for each listener (omitted for brevity)
    }
    Consumer<MQEvent> initProducer();
    void initConsumer(MQListener listener) throws Exception;
    void start();
    default MQEventSerialization serialization() { return SpringContext.getBean(config().getSerialization()); }
    default void consume(MQListener listener, MQEvent event) {
        Logger log = LoggerFactory.getLogger("### BASE-MQ : " + impl() + "Client ###");
        log.info("Consume MQ: {}", serialization().serialize(event));
        if (config().isPersist()) {
            try { SpringContext.getBean(MQEventStorer.class).store(event.getTopic(), event); }
            catch (Exception e) { log.error("Persist MQ failed: {}", serialization().serialize(event), e); }
        }
        listener.getConsumer().accept(event);
    }
}

Example Event Flow

A ShopSyncEvent extends MQEvent to represent a shop‑creation notification. The producer (mall service) builds the event and calls publish(). Two consumer services (IOT and leasing) annotate a handler method with @MQEventListener(topic="mall", tags="shop_sync") to receive the event.

@Data @Builder @AllArgsConstructor @NoArgsConstructor
public class ShopSyncEvent extends MQEvent {
    private String shopId;
    private String shopName;
    private JSONObject shopInfo;
    @Override public String getTopic() { return "mall"; }
    @Override public String getTag()   { return "shop_sync"; }
}

// Producer
ShopSyncEvent.builder().shopId(id).shopName(name).shopInfo(new JSONObject(info))
    .tenantId(tenantId).publish();

// Consumer
@MQEventListener(topic = "mall", tags = "shop_sync")
public void onShopSync(ShopSyncEvent event) {
    log.info("Shop sync to leasing/IOT: {}", event.getShopInfo());
    // business logic
}

Concrete MQClient Implementations

RocketMQ

public final class RocketClient implements MQClient {
    private static Map<String, Supplier<DefaultMQPushConsumer>> LISTENERS = new ConcurrentHashMap<>();
    @Override public String impl() { return "rocket"; }
    @Override public Consumer<MQEvent> initProducer() {
        try {
            DefaultMQProducer producer;
            if (!config().getUsername().isEmpty() && !config().getPassword().isEmpty()) {
                producer = new DefaultMQProducer(config().getNamespace(), config().getProducerGroup(),
                    new AclClientRPCHook(new SessionCredentials(config().getUsername(), config().getPassword())));
            } else {
                producer = new DefaultMQProducer(config().getNamespace(), config().getProducerGroup());
            }
            producer.setNamesrvAddr(config().getServer());
            producer.start();
            return ev -> {
                String msg = serialization().serialize(ev);
                Message m = new Message(ev.getTopic(), ev.getTag() == null ? "" : ev.getTag(), msg.getBytes(RemotingHelper.DEFAULT_CHARSET));
                try { producer.send(m); } catch (Exception e) { log.error("Publish MQ failed", e); }
            };
        } catch (MQClientException e) { log.error("Create Rocket producer failed", e); }
        return null;
    }
    @Override public void initConsumer(MQListener l) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(config().getNamespace(), l.getGroup());
        consumer.setNamesrvAddr(config().getServer());
        consumer.subscribe(l.getTopic(), l.getTags());
        consumer.registerMessageListener((msgs, ctx) -> {
            for (MessageExt me : msgs) {
                MQEvent ev = serialization().deserialize(new String(me.getBody(), StandardCharsets.UTF_8), l.getEventClass());
                consume(l, ev);
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        LISTENERS.put(l.getTopic(), () -> consumer);
    }
    @Override public void start() { LISTENERS.values().forEach(s -> s.get().start()); }
}

Kafka

public final class KafkaClient implements MQClient {
    private static List<Supplier<KafkaConsumer<String,String>>> LISTENERS = new ArrayList<>();
    @Override public String impl() { return "kafka"; }
    @Override public Consumer<MQEvent> initProducer() {
        Properties p = new Properties();
        p.put("bootstrap.servers", config().getServer());
        p.put("acks", "all");
        p.put("retries", config().getRetries());
        p.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        p.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer<String,String> prod = new KafkaProducer<>(p);
        return ev -> {
            String msg = serialization().serialize(ev);
            prod.send(new ProducerRecord<>(config().namespace("_") + ev.getTopic(), msg));
        };
    }
    @Override public void initConsumer(MQListener l) throws Exception {
        Properties p = new Properties();
        p.put("bootstrap.servers", config().getServer());
        p.put("group.id", l.getGroup());
        p.put("enable.auto.commit", config().isAutoAck());
        p.put("auto.offset.reset", "earliest");
        p.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        p.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String,String> consumer = new KafkaConsumer<>(p);
        consumer.subscribe(Collections.singletonList(config().namespace("_") + l.getTopic()));
        LISTENERS.add(() -> {
            GlobalThreadPool.submit(() -> {
                while (true) {
                    ConsumerRecords<String,String> recs = consumer.poll(Duration.ofMillis(1000));
                    for (ConsumerRecord<String,String> r : recs) {
                        MQEvent ev = serialization().deserialize(r.value(), l.getEventClass());
                        if (!MQFilter.matchExps(ev.getTag(), l.getTags())) continue;
                        try { consume(l, ev); if (!config().isAutoAck()) consumer.commitSync(); }
                        catch (Exception e) { log.error("Consume MQ failed", ev, e); }
                    }
                }
            });
            return consumer;
        });
    }
    @Override public void start() { LISTENERS.forEach(Supplier::get); }
    @Override public void destroy() throws Exception { LISTENERS.forEach(s -> s.get().close()); }
}

Redis

public final class RedisClient implements MQClient {
    private static JedisPool JEDIS_POOL;
    private static List<Supplier<Void>> LISTENERS = new ArrayList<>();
    @Override public String impl() { return "redis"; }
    @Override public Consumer<MQEvent> initProducer() {
        return ev -> {
            String msg = serialization().serialize(ev);
            String channel = String.format("%s%s%s",
                config().getNamespace().isEmpty() ? "" : config().getNamespace() + ":",
                ev.getTopic(),
                (ev.getTag() == null || ev.getTag().isEmpty()) ? "" : ":" + ev.getTag());
            GlobalThreadPool.submit(() -> {
                try (Jedis j = jedis()) { j.publish(channel, msg); }
                catch (Throwable t) { log.error("Publish MQ to [{}] failed", channel, t); }
            });
        };
    }
    @Override public void initConsumer(MQListener l) throws Exception {
        List<String> channels = new ArrayList<>();
        if (l.getTags() != null && !l.getTags().isEmpty()) {
            for (String tag : MQFilter.findIncludes(l.getTags())) {
                channels.add(config().getNamespace() + ":" + l.getTopic() + ":" + tag);
            }
        } else {
            channels.add(config().getNamespace() + ":" + l.getTopic());
        }
        String lockScript = "if redis.call('get', KEYS[1]) == false then redis.call('setex', KEYS[1], tonumber(ARGV[1]), KEYS[1]) return 1 else return 0 end";
        LISTENERS.add(() -> {
            GlobalThreadPool.submit(() -> {
                try (Jedis j = jedis()) {
                    j.subscribe(new JedisPubSub() {
                        @Override public void onMessage(String channel, String message) {
                            try {
                                MQEvent ev = serialization().deserialize(message, l.getEventClass());
                                String lockKey = String.format("ChannelGroupLock:%s:%s:%s", channel, l.getGroup(), ev.getMsgId());
                                if ((long) j.eval(lockScript, 1, lockKey, "10000") == 1) {
                                    consume(l, ev);
                                }
                            } catch (Exception e) { log.error("Consume MQ failed", e); }
                        }
                    }, channels.toArray(new String[0]));
                }
            });
            return null;
        });
    }
    @Override public void start() { LISTENERS.forEach(Supplier::get); }
    @Override public void destroy() throws Exception { if (JEDIS_POOL != null) JEDIS_POOL.close(); }
    private Jedis jedis() {
        if (JEDIS_POOL == null) {
            String[] hostPort = config().getServer().split(":");
            JedisPoolConfig pc = new JedisPoolConfig();
            pc.setMaxTotal(100);
            pc.setMaxIdle(50);
            pc.setMinIdle(10);
            pc.setTestOnBorrow(true);
            if (!config().getPassword().isEmpty()) {
                JEDIS_POOL = new JedisPool(pc, hostPort[0], Integer.parseInt(hostPort[1]), 5000, config().getPassword());
            } else {
                JEDIS_POOL = new JedisPool(pc, hostPort[0], Integer.parseInt(hostPort[1]));
            }
        }
        return JEDIS_POOL.getResource();
    }
}

RabbitMQ

public final class RabbitClient implements MQClient {
    private static Connection CONNECTION;
    private static List<Supplier<Channel>> LISTENERS = new ArrayList<>();
    @Override public String impl() { return "rabbit"; }
    @Override public Consumer<MQEvent> initProducer() {
        try {
            Channel ch = connection().createChannel();
            return ev -> {
                String msg = serialization().serialize(ev);
                String routingKey = config().namespace(".") + ev.getTopic() +
                    (ev.getTag() == null || ev.getTag().isEmpty() ? "" : "." + ev.getTag());
                try { ch.basicPublish(config().getExchange(), routingKey, null, msg.getBytes()); }
                catch (Exception e) { log.error("Publish MQ failed", e); }
            };
        } catch (IOException e) { throw new RuntimeException(e); }
    }
    @Override public void initConsumer(MQListener l) throws Exception {
        String queue = l.getGroup() + "." + config().namespace(".") +
            l.getListenerMethod().getDeclaringClass().getSimpleName() + "." + l.getListenerMethod().getName();
        List<String> routingKeys = new ArrayList<>();
        if (l.getTags() != null && !l.getTags().isEmpty()) {
            for (String tag : MQFilter.findIncludes(l.getTags())) {
                routingKeys.add(config().namespace(".") + l.getTopic() + "." + tag);
            }
        } else {
            routingKeys.add(config().namespace(".") + l.getTopic());
        }
        try (Channel ch = connection().createChannel()) {
            ch.queueDeclare(queue, true, false, false, null);
            for (String rk : routingKeys) { ch.queueBind(queue, config().getExchange(), rk); }
            DeliverCallback cb = (ctag, delivery) -> {
                String msg = new String(delivery.getBody(), StandardCharsets.UTF_8);
                MQEvent ev = serialization().deserialize(msg, l.getEventClass());
                if (!MQFilter.matchExps(ev.getTag(), l.getTags())) return;
                try {
                    consume(l, ev);
                    if (!config().isAutoAck()) ch.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                } catch (Exception e) { log.error("Consume MQ failed", ev, e); }
            };
            ch.basicConsume(queue, config().isAutoAck(), cb, consumerTag -> {});
            LISTENERS.add(() -> ch);
        }
    }
    @Override public void start() { LISTENERS.forEach(Supplier::get); }
    @Override public void destroy() throws Exception {
        for (Supplier<Channel> s : LISTENERS) { s.get().close(); }
        if (CONNECTION != null) CONNECTION.close();
    }
    private Connection connection() {
        if (CONNECTION == null) {
            ConnectionFactory factory = new ConnectionFactory();
            String[] hp = config().getServer().split(":");
            factory.setHost(hp[0]);
            factory.setPort(Integer.parseInt(hp[1]));
            factory.setUsername(config().getUsername());
            factory.setPassword(config().getPassword());
            try { CONNECTION = factory.newConnection(); } catch (IOException | TimeoutException e) { throw new RuntimeException(e); }
        }
        return CONNECTION;
    }
}

Overall Package Structure

config – contains BaseMQProperties and the auto‑configuration class that collects all MQClient beans, calls init() and start().

core – defines the core abstractions ( MQClient, MQEvent, @MQEventListener, MQListener, serialization, storage, filtering).

impl – provides the JSON serializer and the four concrete client implementations (Rocket, Kafka, Redis, Rabbit).

The component can be switched between implementations by setting base-mq.impl (e.g., rocket, kafka, redis, rabbit) and providing the corresponding connection properties. Adding a new MQ simply requires implementing MQClient and registering the bean.

JavaMicroservicesRedisSpringKafkaMessage QueueRabbitMQRocketMQ
Architect's Journey
Written by

Architect's Journey

E‑commerce, SaaS, AI architect; DDD enthusiast; SKILL enthusiast

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.