Backend Development 10 min read

Refactoring a Message Bus with Guava EventBus: Improving Broadcast Notifications and Replacing Chain‑of‑Responsibility

This article describes how to redesign a Java‑based message bus by integrating Guava EventBus to streamline broadcast notifications and replace the traditional chain‑of‑responsibility pattern with an event‑driven approach, providing code examples and deployment considerations.

Architecture Digest
Architecture Digest
Architecture Digest
Refactoring a Message Bus with Guava EventBus: Improving Broadcast Notifications and Replacing Chain‑of‑Responsibility

Recently the author spent considerable time refactoring a message bus, introducing Guava's EventBus to address two scenarios: improving broadcast notifications and using event‑driven processing to replace the chain‑of‑responsibility pattern.

EventBus Overview EventBus, part of Google Guava, implements the observer pattern and enables decoupled communication between components within an application.

Improving Broadcast Notifications Previously, clients received broadcast notifications via a SDK method:

public void setNotificationListener(IMessageReceiveListener notificationListener);

The old design relied on a RabbitMQ‑based tree routing topology that introduced “virtual queues” which could send but not receive messages, creating a major flaw. By shifting the focus to a pub/sub component (e.g., Redis or Zookeeper) that each client pool connects to, the author created a more reliable broadcast channel.

Because the pub/sub component operates at the pool level, an additional translation layer using EventBus is required to forward broadcast events from the pool to individual clients:

public class NotifyHandler implements IPubSubListener {
    @Override
    public void onChange(String channel, byte[] data, Map<String, Object> params) {
        NotifyEvent notifyEvent = new NotifyEvent();
        Message broadcastMsg = pubsuberManager.deserialize(data, Message.class);
        if (broadcastMsg != null && broadcastMsg.getMessageType().equals(MessageType.BroadcastMessage)) {
            notifyEvent.setMsg(broadcastMsg);
            getComponentEventBus().post(notifyEvent);
        }
    }
}

EventBus distributes the posted event to registered subscribers. The bus used here is an asynchronous instance that processes handlers on a separate thread. Handlers are registered via the client:

public void registerEventProcessor(Object eventProcessor) {
    componentEventBus.register(eventProcessor);
}

A sample subscriber for broadcast notifications:

public static class NotificationEventProcessor {
    @Subscribe
    public void onNotification(NotifyEvent event) {
        logger.info("onNotification");
        Message message = event.getMsg();
        assertNotNull(message);
        assertEquals("test", new String(message.getContent(), Constants.CHARSET_OF_UTF8));
    }
}

Unregistering is straightforward:

public void testBroadcast() throws Exception {
    String secret = "kljasdoifqoikjhhhqwhebasdfasdf";
    Message msg = MessageFactory.createMessage(MessageType.BroadcastMessage);
    msg.setContentType("text/plain");
    msg.setContentEncoding("utf-8");
    msg.setContent("test".getBytes(Constants.CHARSET_OF_UTF8));
    NotificationEventProcessor eventProcessor = new NotificationEventProcessor();
    client.registerEventProcessor(eventProcessor);
    client.broadcast(secret, new Message[]{msg});
    TimeUnit.SECONDS.sleep(10);
    client.unregisterEventProcessor(eventProcessor);
}

Event‑Driven Replacement of Chain‑of‑Responsibility The original implementation used a recursive chain of responsibility, which could cause memory‑leak‑like behavior when long‑running connections never returned. By treating each logical step as an event, the same sequencing is achieved without recursion.

Example of defining a producer event processor and its events:

public class ProduceEventProcessor extends CommonEventProcessor {
    // event definitions
    public static class ValidateEvent extends CarryEvent {}
    public static class PermissionCheckEvent extends CarryEvent {}
    public static class ProduceEvent extends CarryEvent {}
}

Event handler methods are annotated with @Subscribe :

@Subscribe
public void onValidate(ValidateEvent event) {}

@Subscribe
public void onPermissionCheckEvent(PermissionCheckEvent event) {}

@Subscribe
public void onProduce(ProduceEvent event) {}

Registration and event emission:

EventBus carryEventBus = this.getContext().getCarryEventBus();
ProduceEventProcessor eventProcessor = new ProduceEventProcessor();
carryEventBus.register(eventProcessor);

// initialize events
ProduceEventProcessor.ValidateEvent validateEvent = new ProduceEventProcessor.ValidateEvent();
// ... other event instances ...

// publish in the required order
carryEventBus.post(validateEvent);
carryEventBus.post(msgBodySizeCheckEvent);
carryEventBus.post(permissionCheckEvent);
carryEventBus.post(msgIdGenerateEvent);
carryEventBus.post(msgBodyCompressEvent);
carryEventBus.post(produceEvent);

This approach eliminates recursive calls, keeps processors loosely coupled, and allows easy extension by defining new events, adding handler methods, instantiating the events, and inserting them into the existing sequence.

For the full source code, see the banyan project.

backendjavaGuavaevent-drivenEventBusMessageBus
Architecture Digest
Written by

Architecture Digest

Focusing on Java backend development, covering application architecture from top-tier internet companies (high availability, high performance, high stability), big data, machine learning, Java architecture, and other popular fields.

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.