Why Disruptor Beats Traditional Queues: Inside Java’s High‑Performance RingBuffer
This article explains how the Disruptor library implements lock‑free, bounded queues using a ring‑buffer, cache‑line padding and CAS operations, compares single‑ and multi‑producer sequencers, and provides practical Java code examples for building ultra‑low‑latency producer‑consumer pipelines.
Queues under java.util.concurrent
JUC queues typically use locks to guarantee thread safety, which can cause significant performance overhead and priority inversion; lock‑free implementations are unbounded and cannot enforce size limits, while locked queues can be bounded to prevent memory overflow in high‑stability systems.
Disruptor
What is Disruptor
Disruptor is a high‑performance bounded queue developed by LMAX to solve memory‑queue latency issues, suitable for producer‑consumer models.
Why Disruptor is fast
It uses a ring‑buffer (an array of slots) pre‑allocated in memory, reducing GC pressure; employs cache‑line padding to avoid false sharing; and leverages a long sequence with bitwise indexing (size must be a power of two) to quickly locate slots.
Consumers access slots via the sequence using CAS, minimizing synchronized blocks.
Core classes of Disruptor
RingBuffer : Stores and updates event objects.
Sequence : Marks producer and consumer positions, padded to prevent false sharing.
SequenceBarrier : Determines if events are available for consumption.
WaitStrategy : Balances producer‑consumer efficiency.
Event : User‑defined data unit.
EventProcessor : Main event loop handling events.
EventHandler : User‑implemented consumer interface.
WorkHandler : Interface for multiple consumers in work mode.
WorkProcessor : Ensures each sequence is consumed by only one processor.
WorkerPool : Pool of WorkProcessors.
LifecycleAware : Notifies components on start/stop.
Sequence
Sequence marks the positions of producers and consumers; its value is padded with seven long fields before and after to prevent false sharing.
Framework class relationship diagram
Cursored – Get current sequence value
public interface Cursored {
/**
* Get current sequence value
*/
long getCursor();
}Cursored provides a method to obtain the current sequence.
Sequenced – Sequence acquisition and publication
public interface Sequenced {
// Get buffer size
int getBufferSize();
// Check if there is available capacity
boolean hasAvailableCapacity(final int requiredCapacity);
// Get remaining capacity
long remainingCapacity();
// Acquire next sequence for publishing
long next();
// Acquire n sequences
long next(int n);
// Try to acquire a sequence, may throw InsufficientCapacityException
long tryNext() throws InsufficientCapacityException;
// Try to acquire n sequences
long tryNext(int n) throws InsufficientCapacityException;
// Publish a sequence
void publish(long sequence);
// Batch publish sequences
void publish(long lo, long hi);
}Sequencer
public interface Sequencer extends Cursored, Sequenced {
long INITIAL_CURSOR_VALUE = -1L;
void claim(long sequence);
boolean isAvailable(long sequence);
void addGatingSequences(Sequence... gatingSequences);
boolean removeGatingSequence(Sequence sequence);
SequenceBarrier newBarrier(Sequence... sequencesToTrack);
long getMinimumSequence();
long getHighestPublishedSequence(long nextSequence, long availableSequence);
<T> EventPoller<T> newPoller(DataProvider<T> provider, Sequence... gatingSequences);
}Sequencer methods are mainly used by event publishers; newBarrier() is used by event processors.
AbstractSequencer – Manages consumer and producer sequences
public abstract class AbstractSequencer implements Sequencer {
private static final AtomicReferenceFieldUpdater<AbstractSequencer, Sequence[]> SEQUENCE_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(AbstractSequencer.class, Sequence[].class, "gatingSequences");
protected final int bufferSize;
protected final WaitStrategy waitStrategy;
protected final Sequence cursor = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
protected volatile Sequence[] gatingSequences = new Sequence[0];
// ... constructor checks bufferSize is power of 2 ...
@Override
public final long getCursor() { return cursor.get(); }
@Override
public final int getBufferSize() { return bufferSize; }
@Override
public final void addGatingSequences(Sequence... gatingSequences) {
SequenceGroups.addSequences(this, SEQUENCE_UPDATER, this, gatingSequences);
}
@Override
public final boolean removeGatingSequence(Sequence sequence) {
return SequenceGroups.removeSequence(this, SEQUENCE_UPDATER, sequence);
}
@Override
public final long getMinimumSequence() {
return Util.getMinimumSequence(gatingSequences, cursor.get());
}
@Override
public final SequenceBarrier newBarrier(Sequence... sequencesToTrack) {
return new ProcessingSequenceBarrier(this, waitStrategy, cursor, sequencesToTrack);
}
// ... other methods ...
}SingleProducerSequencer – Single‑threaded event publisher
It inherits AbstractSequencer, maintains cachedValue and nextValue, and is not thread‑safe.
public long next(int n) {
if (n < 1) throw new IllegalArgumentException("n must be > 0");
long nextValue = this.nextValue;
long nextSequence = nextValue + n;
long wrapPoint = nextSequence - bufferSize;
long cachedGatingSequence = this.cachedValue;
if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue) {
cursor.setVolatile(nextValue);
long minSequence;
while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue))) {
LockSupport.parkNanos(1L);
}
this.cachedValue = minSequence;
}
this.nextValue = nextSequence;
return nextSequence;
}
public void publish(long sequence) {
cursor.set(sequence);
waitStrategy.signalAllWhenBlocking();
}Practical single‑threaded producer example
public static void main(String[] args) {
Disruptor<TradeBO> disruptor = new Disruptor<>(
TradeBO::new, 2,
r -> { Thread t = new Thread(r); t.setName("Single‑Threaded Producer"); return t; },
ProducerType.SINGLE, new BlockingWaitStrategy());
disruptor.handleEventsWith(new ConsumerA());
disruptor.handleEventsWith(new ConsumerB());
disruptor.start();
for (int i = 1; i < 10; i++) {
final int id = i;
EventTranslator<TradeBO> translator = (event, seq) -> {
event.setId(id);
event.setPrice((double) id);
};
disruptor.publishEvent(translator);
}
disruptor.shutdown();
}
@Data
public class TradeBO { private Integer id; private Double price; }MultiProducerSequencer
Member variables
private static final Unsafe UNSAFE = Util.getUnsafe();
private static final long BASE = UNSAFE.arrayBaseOffset(int[].class);
private static final long SCALE = UNSAFE.arrayIndexScale(int[].class);
private final Sequence gatingSequenceCache = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
private final int[] availableBuffer;
private final int indexMask;
private final int indexShift;Constructor
public MultiProducerSequencer(int bufferSize, final WaitStrategy waitStrategy) {
super(bufferSize, waitStrategy);
availableBuffer = new int[bufferSize];
indexMask = bufferSize - 1;
indexShift = Util.log2(bufferSize);
initialiseAvailableBuffer();
}
private void initialiseAvailableBuffer() {
for (int i = availableBuffer.length - 1; i != 0; i--) {
setAvailableBufferValue(i, -1);
}
setAvailableBufferValue(0, -1);
}
private void setAvailableBufferValue(int index, int flag) {
long bufferAddress = (index * SCALE) + BASE;
UNSAFE.putOrderedInt(availableBuffer, bufferAddress, flag);
}next() – Acquire sequence
public long next(int n) {
if (n < 1) throw new IllegalArgumentException("n must be > 0");
long current, next;
do {
current = cursor.get();
next = current + n;
long wrapPoint = next - bufferSize;
long cachedGatingSequence = gatingSequenceCache.get();
if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current) {
long gatingSequence = Util.getMinimumSequence(gatingSequences, current);
if (wrapPoint > gatingSequence) { LockSupport.parkNanos(1); continue; }
gatingSequenceCache.set(gatingSequence);
} else if (cursor.compareAndSet(current, next)) {
break;
}
} while (true);
return next;
}publish() – Publish event
public void publish(final long sequence) {
setAvailable(sequence);
waitStrategy.signalAllWhenBlocking();
}
private void setAvailable(final long sequence) {
setAvailableBufferValue(calculateIndex(sequence), calculateAvailabilityFlag(sequence));
}
private int calculateIndex(final long sequence) { return (int) sequence & indexMask; }
private int calculateAvailabilityFlag(final long sequence) { return (int) (sequence >>> indexShift); }Differences between MultiProducerSequencer and SingleProducerSequencer
SingleProducerSequencer maintains cachedValue and nextValue with padding and is not thread‑safe; MultiProducerSequencer obtains sequences via atomic operations on a shared Sequence, making it safe for multiple producers.
RingBuffer
EventSequencer
public interface EventSequencer<T> extends DataProvider<T>, Sequenced { }DataProvider
public interface DataProvider<T> {
T get(long sequence);
}EventSink – Publishing methods
EventSink defines various publish methods that use EventTranslators to initialise events before publishing.
RingBufferPad – Cache‑line padding
RingBufferFields – Core storage logic
abstract class RingBufferFields<E> extends RingBufferPad {
private static final int BUFFER_PAD;
private static final long REF_ARRAY_BASE;
private static final int REF_ELEMENT_SHIFT;
private static final Unsafe UNSAFE = Util.getUnsafe();
private final long indexMask;
private final Object[] entries;
protected final int bufferSize;
protected final Sequencer sequencer;
RingBufferFields(EventFactory<E> eventFactory, Sequencer sequencer) {
this.sequencer = sequencer;
this.bufferSize = sequencer.getBufferSize();
if (bufferSize < 1) throw new IllegalArgumentException("bufferSize must not be less than 1");
if (Integer.bitCount(bufferSize) != 1) throw new IllegalArgumentException("bufferSize must be a power of 2");
this.indexMask = bufferSize - 1;
this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD];
fill(eventFactory);
}
private void fill(EventFactory<E> eventFactory) {
for (int i = 0; i < bufferSize; i++) {
entries[BUFFER_PAD + i] = eventFactory.newInstance();
}
}
@SuppressWarnings("unchecked")
protected final E elementAt(long sequence) {
return (E) UNSAFE.getObject(entries, REF_ARRAY_BASE + ((sequence & indexMask) << REF_ELEMENT_SHIFT));
}
}SequenceBarrier – Consumer usage
public interface SequenceBarrier {
long waitFor(long sequence) throws AlertException, InterruptedException, TimeoutException;
long getCursor();
boolean isAlerted();
void alert();
void clearAlert();
void checkAlert() throws AlertException;
}ProcessingSequenceBarrier
final class ProcessingSequenceBarrier implements SequenceBarrier {
private final WaitStrategy waitStrategy;
private final Sequence dependentSequence;
private volatile boolean alerted = false;
private final Sequence cursorSequence;
private final Sequencer sequencer;
ProcessingSequenceBarrier(Sequencer sequencer, WaitStrategy waitStrategy, Sequence cursorSequence, Sequence[] dependentSequences) {
this.sequencer = sequencer;
this.waitStrategy = waitStrategy;
this.cursorSequence = cursorSequence;
this.dependentSequence = (dependentSequences.length == 0) ? cursorSequence : new FixedSequenceGroup(dependentSequences);
}
@Override
public long waitFor(final long sequence) throws AlertException, InterruptedException, TimeoutException {
checkAlert();
long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);
if (availableSequence < sequence) return availableSequence;
return sequencer.getHighestPublishedSequence(sequence, availableSequence);
}
@Override public long getCursor() { return dependentSequence.get(); }
@Override public boolean isAlerted() { return alerted; }
@Override public void alert() { alerted = true; waitStrategy.signalAllWhenBlocking(); }
@Override public void clearAlert() { alerted = false; }
@Override public void checkAlert() throws AlertException { if (alerted) throw AlertException.INSTANCE; }
}EventProcessor – Event handling interface
public interface EventProcessor extends Runnable {
Sequence getSequence();
void halt();
boolean isRunning();
}BatchEventProcessor – Single‑threaded processing
public final class BatchEventProcessor<T> implements EventProcessor {
// run method omitted for brevity – it clears alerts, notifies start, processes events in batches using the SequenceBarrier, handles timeouts, alerts and exceptions, then notifies shutdown.
}WorkProcessor – Multi‑threaded work mode
public void run() {
if (!running.compareAndSet(false, true)) throw new IllegalStateException("Thread is already running");
sequenceBarrier.clearAlert();
notifyStart();
boolean processedSequence = true;
long cachedAvailableSequence = Long.MIN_VALUE;
long nextSequence = sequence.get();
T event = null;
while (true) {
try {
if (processedSequence) {
processedSequence = false;
do {
nextSequence = workSequence.get() + 1L;
sequence.set(nextSequence - 1L);
} while (!workSequence.compareAndSet(nextSequence - 1L, nextSequence));
}
if (cachedAvailableSequence >= nextSequence) {
event = ringBuffer.get(nextSequence);
workHandler.onEvent(event);
processedSequence = true;
} else {
cachedAvailableSequence = sequenceBarrier.waitFor(nextSequence);
}
} catch (TimeoutException e) {
notifyTimeout(sequence.get());
} catch (AlertException ex) {
if (!running.get()) break;
} catch (Throwable ex) {
exceptionHandler.handleEventException(ex, nextSequence, event);
processedSequence = true;
}
}
notifyShutdown();
running.set(false);
}WorkerPool
Combines multiple WorkProcessors; maintains a shared workSequence that ensures each event is processed by only one worker.
WaitStrategy – Waiting policies
BlockingWaitStrategy – uses locks, low CPU, higher latency.
BusySpinWaitStrategy – busy‑spins, low latency, high CPU.
LiteBlockingWaitStrategy – lightweight blocking.
SleepingWaitStrategy – mixes spin, yield, and sleep.
TimeoutBlockingWaitStrategy – blocks with timeout, throws on timeout.
YieldingWaitStrategy – spins then yields.
PhasedBackoffWaitStrategy – spin, then yield, then fallback strategy.
Practical multi‑threaded consumer example
public static void main(String[] args) {
RingBuffer<TradeBO> ringBuffer = RingBuffer.createSingleProducer(TradeBO::new, 2);
WorkerPool<TradeBO> workerPool = new WorkerPool<>(
ringBuffer,
ringBuffer.newBarrier(),
new IgnoreExceptionHandler(),
new ConsumerC(), new ConsumerD()
);
ringBuffer.addGatingSequences(workerPool.getWorkerSequences());
Executor executor = Executors.newFixedThreadPool(4);
workerPool.start(executor);
for (int i = 0; i < 4; i++) {
final int id = i;
EventTranslator<TradeBO> translator = (event, seq) -> {
event.setId(id);
event.setPrice((double) id);
};
ringBuffer.publishEvent(translator);
System.out.println("Publish[" + id + "]");
}
}DSL – Defining consumer dependencies
Consumers can be chained so that one runs after another, e.g.:
dw.consumeWith(handler1a, handler2a);
dw.after(handler1a).consumeWith(handler1b);
dw.after(handler2a).consumeWith(handler2b);
dw.after(handler1b, handler2b).consumeWith(handler3);
ProducerBarrier producerBarrier = dw.createProducerBarrier();Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Programmer DD
A tinkering programmer and author of "Spring Cloud Microservices in Action"
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
