Unlock Ultra‑Fast Java Messaging with Disruptor: A Hands‑On Guide
This article introduces the high‑performance Java Disruptor library, explains its core concepts such as Ring Buffer, Sequencer, and Wait Strategies, and provides a step‑by‑step Spring Boot demo with full code to build a lock‑free producer‑consumer message queue.
1. Background
In many projects a message queue is needed, and Disruptor offers a fast, open‑source alternative to Kafka or RabbitMQ.
2. Disruptor Overview
Disruptor is a high‑performance Java queue developed by LMAX, originally created to solve memory‑queue latency issues comparable to I/O latency.
Systems built on Disruptor can handle up to 6 million orders per second on a single thread.
It is designed for the producer‑consumer problem, providing maximum throughput (TPS) and minimal latency.
3. Core Concepts
The main components of Disruptor are:
Ring Buffer : a circular buffer that stores events; since version 3.0 its role is limited to storing and updating data.
Sequence : a monotonically increasing number that tracks the progress of each consumer and prevents false sharing.
Sequencer : the core interface with implementations SingleProducerSequencer and MultiProducerSequencer that manage data transfer between producers and consumers.
Sequence Barrier : ensures consumers see the correct published sequence and defines when a consumer can process events.
Wait Strategy : determines how a consumer waits for the next event (multiple strategies are provided for different performance characteristics).
Event : the data object exchanged between producer and consumer, defined by the user.
EventProcessor : holds a consumer's sequence and runs the event‑handling loop.
EventHandler : user‑implemented interface that processes events.
Producer : any code that publishes events to the Disruptor.
4. Demo – Step‑by‑Step Implementation
Below is a complete Spring Boot example that creates a Disruptor‑based message queue.
1) Add Maven dependency
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.4.4</version>
</dependency>2) Define the message model
/**
* Message body
*/
@Data
public class MessageModel {
private String message;
}3) Create an EventFactory
public class HelloEventFactory implements EventFactory<MessageModel> {
@Override
public MessageModel newInstance() {
return new MessageModel();
}
}4) Implement an EventHandler (consumer)
@Slf4j
public class HelloEventHandler implements EventHandler<MessageModel> {
@Override
public void onEvent(MessageModel event, long sequence, boolean endOfBatch) {
try {
Thread.sleep(1000); // simulate async processing
log.info("Consumer starts processing");
if (event != null) {
log.info("Consumed message: {}", event);
}
} catch (Exception e) {
log.info("Consumer processing failed");
}
log.info("Consumer finished processing");
}
}5) Bean manager for ApplicationContext (utility)
@Component
public class BeanManager implements ApplicationContextAware {
private static ApplicationContext applicationContext = null;
@Override
public void setApplicationContext(ApplicationContext ctx) throws BeansException {
this.applicationContext = ctx;
}
public static ApplicationContext getApplicationContext() { return applicationContext; }
public static Object getBean(String name) { return applicationContext.getBean(name); }
public static <T> T getBean(Class<T> clazz) { return applicationContext.getBean(clazz); }
}6) Configure the Disruptor (MQManager)
@Configuration
public class MQManager {
@Bean("messageModel")
public RingBuffer<MessageModel> messageModelRingBuffer() {
ExecutorService executor = Executors.newFixedThreadPool(2);
HelloEventFactory factory = new HelloEventFactory();
int bufferSize = 1024 * 256; // must be power of 2
Disruptor<MessageModel> disruptor = new Disruptor<>(factory, bufferSize, executor,
ProducerType.SINGLE, new BlockingWaitStrategy());
disruptor.handleEventsWith(new HelloEventHandler());
disruptor.start();
return disruptor.getRingBuffer();
}
}7) Service interface and implementation (producer)
public interface DisruptorMqService {
/** Send a message */
void sayHelloMq(String message);
}
@Slf4j
@Component
@Service
public class DisruptorMqServiceImpl implements DisruptorMqService {
@Autowired
private RingBuffer<MessageModel> messageModelRingBuffer;
@Override
public void sayHelloMq(String message) {
log.info("record the message: {}", message);
long sequence = messageModelRingBuffer.next();
try {
MessageModel event = messageModelRingBuffer.get(sequence);
event.setMessage(message);
log.info("Added event to queue: {}", event);
} catch (Exception e) {
log.error("Failed to add event", e);
} finally {
messageModelRingBuffer.publish(sequence);
}
}
}8) Test class
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest(classes = DemoApplication.class)
public class DemoApplicationTests {
@Autowired
private DisruptorMqService disruptorMqService;
@Test
public void sayHelloMqTest() throws Exception {
disruptorMqService.sayHelloMq("Message arrived, Hello world!");
log.info("Message queue sent");
Thread.sleep(2000); // verify async processing
}
}5. Summary
The producer‑consumer pattern is common, and Disruptor implements it in memory without locks, which explains its exceptional performance.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Java High-Performance Architecture
Sharing Java development articles and resources, including SSM architecture and the Spring ecosystem (Spring Boot, Spring Cloud, MyBatis, Dubbo, Docker), Zookeeper, Redis, architecture design, microservices, message queues, Git, etc.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
