Fundamentals 30 min read

Why Disruptor Beats Traditional Queues: Inside Java’s High‑Performance RingBuffer

This article explains how the Disruptor library implements lock‑free, bounded queues using a ring‑buffer, cache‑line padding and CAS operations, compares single‑ and multi‑producer sequencers, and provides practical Java code examples for building ultra‑low‑latency producer‑consumer pipelines.

Programmer DD
Programmer DD
Programmer DD
Why Disruptor Beats Traditional Queues: Inside Java’s High‑Performance RingBuffer

Queues under java.util.concurrent

JUC queues typically use locks to guarantee thread safety, which can cause significant performance overhead and priority inversion; lock‑free implementations are unbounded and cannot enforce size limits, while locked queues can be bounded to prevent memory overflow in high‑stability systems.

Disruptor

What is Disruptor

Disruptor is a high‑performance bounded queue developed by LMAX to solve memory‑queue latency issues, suitable for producer‑consumer models.

Why Disruptor is fast

It uses a ring‑buffer (an array of slots) pre‑allocated in memory, reducing GC pressure; employs cache‑line padding to avoid false sharing; and leverages a long sequence with bitwise indexing (size must be a power of two) to quickly locate slots.

Consumers access slots via the sequence using CAS, minimizing synchronized blocks.

Core classes of Disruptor

RingBuffer : Stores and updates event objects.

Sequence : Marks producer and consumer positions, padded to prevent false sharing.

SequenceBarrier : Determines if events are available for consumption.

WaitStrategy : Balances producer‑consumer efficiency.

Event : User‑defined data unit.

EventProcessor : Main event loop handling events.

EventHandler : User‑implemented consumer interface.

WorkHandler : Interface for multiple consumers in work mode.

WorkProcessor : Ensures each sequence is consumed by only one processor.

WorkerPool : Pool of WorkProcessors.

LifecycleAware : Notifies components on start/stop.

Sequence

Sequence marks the positions of producers and consumers; its value is padded with seven long fields before and after to prevent false sharing.

Framework class relationship diagram

Cursored – Get current sequence value

public interface Cursored {
    /**
     * Get current sequence value
     */
    long getCursor();
}

Cursored provides a method to obtain the current sequence.

Sequenced – Sequence acquisition and publication

public interface Sequenced {
    // Get buffer size
    int getBufferSize();
    // Check if there is available capacity
    boolean hasAvailableCapacity(final int requiredCapacity);
    // Get remaining capacity
    long remainingCapacity();
    // Acquire next sequence for publishing
    long next();
    // Acquire n sequences
    long next(int n);
    // Try to acquire a sequence, may throw InsufficientCapacityException
    long tryNext() throws InsufficientCapacityException;
    // Try to acquire n sequences
    long tryNext(int n) throws InsufficientCapacityException;
    // Publish a sequence
    void publish(long sequence);
    // Batch publish sequences
    void publish(long lo, long hi);
}

Sequencer

public interface Sequencer extends Cursored, Sequenced {
    long INITIAL_CURSOR_VALUE = -1L;
    void claim(long sequence);
    boolean isAvailable(long sequence);
    void addGatingSequences(Sequence... gatingSequences);
    boolean removeGatingSequence(Sequence sequence);
    SequenceBarrier newBarrier(Sequence... sequencesToTrack);
    long getMinimumSequence();
    long getHighestPublishedSequence(long nextSequence, long availableSequence);
    <T> EventPoller<T> newPoller(DataProvider<T> provider, Sequence... gatingSequences);
}

Sequencer methods are mainly used by event publishers; newBarrier() is used by event processors.

AbstractSequencer – Manages consumer and producer sequences

public abstract class AbstractSequencer implements Sequencer {
    private static final AtomicReferenceFieldUpdater<AbstractSequencer, Sequence[]> SEQUENCE_UPDATER =
        AtomicReferenceFieldUpdater.newUpdater(AbstractSequencer.class, Sequence[].class, "gatingSequences");
    protected final int bufferSize;
    protected final WaitStrategy waitStrategy;
    protected final Sequence cursor = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
    protected volatile Sequence[] gatingSequences = new Sequence[0];
    // ... constructor checks bufferSize is power of 2 ...
    @Override
    public final long getCursor() { return cursor.get(); }
    @Override
    public final int getBufferSize() { return bufferSize; }
    @Override
    public final void addGatingSequences(Sequence... gatingSequences) {
        SequenceGroups.addSequences(this, SEQUENCE_UPDATER, this, gatingSequences);
    }
    @Override
    public final boolean removeGatingSequence(Sequence sequence) {
        return SequenceGroups.removeSequence(this, SEQUENCE_UPDATER, sequence);
    }
    @Override
    public final long getMinimumSequence() {
        return Util.getMinimumSequence(gatingSequences, cursor.get());
    }
    @Override
    public final SequenceBarrier newBarrier(Sequence... sequencesToTrack) {
        return new ProcessingSequenceBarrier(this, waitStrategy, cursor, sequencesToTrack);
    }
    // ... other methods ...
}

SingleProducerSequencer – Single‑threaded event publisher

It inherits AbstractSequencer, maintains cachedValue and nextValue, and is not thread‑safe.

public long next(int n) {
    if (n < 1) throw new IllegalArgumentException("n must be > 0");
    long nextValue = this.nextValue;
    long nextSequence = nextValue + n;
    long wrapPoint = nextSequence - bufferSize;
    long cachedGatingSequence = this.cachedValue;
    if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue) {
        cursor.setVolatile(nextValue);
        long minSequence;
        while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue))) {
            LockSupport.parkNanos(1L);
        }
        this.cachedValue = minSequence;
    }
    this.nextValue = nextSequence;
    return nextSequence;
}

public void publish(long sequence) {
    cursor.set(sequence);
    waitStrategy.signalAllWhenBlocking();
}

Practical single‑threaded producer example

public static void main(String[] args) {
    Disruptor<TradeBO> disruptor = new Disruptor<>(
        TradeBO::new, 2,
        r -> { Thread t = new Thread(r); t.setName("Single‑Threaded Producer"); return t; },
        ProducerType.SINGLE, new BlockingWaitStrategy());
    disruptor.handleEventsWith(new ConsumerA());
    disruptor.handleEventsWith(new ConsumerB());
    disruptor.start();
    for (int i = 1; i < 10; i++) {
        final int id = i;
        EventTranslator<TradeBO> translator = (event, seq) -> {
            event.setId(id);
            event.setPrice((double) id);
        };
        disruptor.publishEvent(translator);
    }
    disruptor.shutdown();
}

@Data
public class TradeBO { private Integer id; private Double price; }

MultiProducerSequencer

Member variables

private static final Unsafe UNSAFE = Util.getUnsafe();
private static final long BASE = UNSAFE.arrayBaseOffset(int[].class);
private static final long SCALE = UNSAFE.arrayIndexScale(int[].class);
private final Sequence gatingSequenceCache = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
private final int[] availableBuffer;
private final int indexMask;
private final int indexShift;

Constructor

public MultiProducerSequencer(int bufferSize, final WaitStrategy waitStrategy) {
    super(bufferSize, waitStrategy);
    availableBuffer = new int[bufferSize];
    indexMask = bufferSize - 1;
    indexShift = Util.log2(bufferSize);
    initialiseAvailableBuffer();
}
private void initialiseAvailableBuffer() {
    for (int i = availableBuffer.length - 1; i != 0; i--) {
        setAvailableBufferValue(i, -1);
    }
    setAvailableBufferValue(0, -1);
}
private void setAvailableBufferValue(int index, int flag) {
    long bufferAddress = (index * SCALE) + BASE;
    UNSAFE.putOrderedInt(availableBuffer, bufferAddress, flag);
}

next() – Acquire sequence

public long next(int n) {
    if (n < 1) throw new IllegalArgumentException("n must be > 0");
    long current, next;
    do {
        current = cursor.get();
        next = current + n;
        long wrapPoint = next - bufferSize;
        long cachedGatingSequence = gatingSequenceCache.get();
        if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current) {
            long gatingSequence = Util.getMinimumSequence(gatingSequences, current);
            if (wrapPoint > gatingSequence) { LockSupport.parkNanos(1); continue; }
            gatingSequenceCache.set(gatingSequence);
        } else if (cursor.compareAndSet(current, next)) {
            break;
        }
    } while (true);
    return next;
}

publish() – Publish event

public void publish(final long sequence) {
    setAvailable(sequence);
    waitStrategy.signalAllWhenBlocking();
}
private void setAvailable(final long sequence) {
    setAvailableBufferValue(calculateIndex(sequence), calculateAvailabilityFlag(sequence));
}
private int calculateIndex(final long sequence) { return (int) sequence & indexMask; }
private int calculateAvailabilityFlag(final long sequence) { return (int) (sequence >>> indexShift); }

Differences between MultiProducerSequencer and SingleProducerSequencer

SingleProducerSequencer maintains cachedValue and nextValue with padding and is not thread‑safe; MultiProducerSequencer obtains sequences via atomic operations on a shared Sequence, making it safe for multiple producers.

RingBuffer

EventSequencer

public interface EventSequencer<T> extends DataProvider<T>, Sequenced { }

DataProvider

public interface DataProvider<T> {
    T get(long sequence);
}

EventSink – Publishing methods

EventSink defines various publish methods that use EventTranslators to initialise events before publishing.

RingBufferPad – Cache‑line padding

RingBufferFields – Core storage logic

abstract class RingBufferFields<E> extends RingBufferPad {
    private static final int BUFFER_PAD;
    private static final long REF_ARRAY_BASE;
    private static final int REF_ELEMENT_SHIFT;
    private static final Unsafe UNSAFE = Util.getUnsafe();
    private final long indexMask;
    private final Object[] entries;
    protected final int bufferSize;
    protected final Sequencer sequencer;
    RingBufferFields(EventFactory<E> eventFactory, Sequencer sequencer) {
        this.sequencer = sequencer;
        this.bufferSize = sequencer.getBufferSize();
        if (bufferSize < 1) throw new IllegalArgumentException("bufferSize must not be less than 1");
        if (Integer.bitCount(bufferSize) != 1) throw new IllegalArgumentException("bufferSize must be a power of 2");
        this.indexMask = bufferSize - 1;
        this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD];
        fill(eventFactory);
    }
    private void fill(EventFactory<E> eventFactory) {
        for (int i = 0; i < bufferSize; i++) {
            entries[BUFFER_PAD + i] = eventFactory.newInstance();
        }
    }
    @SuppressWarnings("unchecked")
    protected final E elementAt(long sequence) {
        return (E) UNSAFE.getObject(entries, REF_ARRAY_BASE + ((sequence & indexMask) << REF_ELEMENT_SHIFT));
    }
}

SequenceBarrier – Consumer usage

public interface SequenceBarrier {
    long waitFor(long sequence) throws AlertException, InterruptedException, TimeoutException;
    long getCursor();
    boolean isAlerted();
    void alert();
    void clearAlert();
    void checkAlert() throws AlertException;
}

ProcessingSequenceBarrier

final class ProcessingSequenceBarrier implements SequenceBarrier {
    private final WaitStrategy waitStrategy;
    private final Sequence dependentSequence;
    private volatile boolean alerted = false;
    private final Sequence cursorSequence;
    private final Sequencer sequencer;
    ProcessingSequenceBarrier(Sequencer sequencer, WaitStrategy waitStrategy, Sequence cursorSequence, Sequence[] dependentSequences) {
        this.sequencer = sequencer;
        this.waitStrategy = waitStrategy;
        this.cursorSequence = cursorSequence;
        this.dependentSequence = (dependentSequences.length == 0) ? cursorSequence : new FixedSequenceGroup(dependentSequences);
    }
    @Override
    public long waitFor(final long sequence) throws AlertException, InterruptedException, TimeoutException {
        checkAlert();
        long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);
        if (availableSequence < sequence) return availableSequence;
        return sequencer.getHighestPublishedSequence(sequence, availableSequence);
    }
    @Override public long getCursor() { return dependentSequence.get(); }
    @Override public boolean isAlerted() { return alerted; }
    @Override public void alert() { alerted = true; waitStrategy.signalAllWhenBlocking(); }
    @Override public void clearAlert() { alerted = false; }
    @Override public void checkAlert() throws AlertException { if (alerted) throw AlertException.INSTANCE; }
}

EventProcessor – Event handling interface

public interface EventProcessor extends Runnable {
    Sequence getSequence();
    void halt();
    boolean isRunning();
}

BatchEventProcessor – Single‑threaded processing

public final class BatchEventProcessor<T> implements EventProcessor {
    // run method omitted for brevity – it clears alerts, notifies start, processes events in batches using the SequenceBarrier, handles timeouts, alerts and exceptions, then notifies shutdown.
}

WorkProcessor – Multi‑threaded work mode

public void run() {
    if (!running.compareAndSet(false, true)) throw new IllegalStateException("Thread is already running");
    sequenceBarrier.clearAlert();
    notifyStart();
    boolean processedSequence = true;
    long cachedAvailableSequence = Long.MIN_VALUE;
    long nextSequence = sequence.get();
    T event = null;
    while (true) {
        try {
            if (processedSequence) {
                processedSequence = false;
                do {
                    nextSequence = workSequence.get() + 1L;
                    sequence.set(nextSequence - 1L);
                } while (!workSequence.compareAndSet(nextSequence - 1L, nextSequence));
            }
            if (cachedAvailableSequence >= nextSequence) {
                event = ringBuffer.get(nextSequence);
                workHandler.onEvent(event);
                processedSequence = true;
            } else {
                cachedAvailableSequence = sequenceBarrier.waitFor(nextSequence);
            }
        } catch (TimeoutException e) {
            notifyTimeout(sequence.get());
        } catch (AlertException ex) {
            if (!running.get()) break;
        } catch (Throwable ex) {
            exceptionHandler.handleEventException(ex, nextSequence, event);
            processedSequence = true;
        }
    }
    notifyShutdown();
    running.set(false);
}

WorkerPool

Combines multiple WorkProcessors; maintains a shared workSequence that ensures each event is processed by only one worker.

WaitStrategy – Waiting policies

BlockingWaitStrategy – uses locks, low CPU, higher latency.

BusySpinWaitStrategy – busy‑spins, low latency, high CPU.

LiteBlockingWaitStrategy – lightweight blocking.

SleepingWaitStrategy – mixes spin, yield, and sleep.

TimeoutBlockingWaitStrategy – blocks with timeout, throws on timeout.

YieldingWaitStrategy – spins then yields.

PhasedBackoffWaitStrategy – spin, then yield, then fallback strategy.

Practical multi‑threaded consumer example

public static void main(String[] args) {
    RingBuffer<TradeBO> ringBuffer = RingBuffer.createSingleProducer(TradeBO::new, 2);
    WorkerPool<TradeBO> workerPool = new WorkerPool<>(
        ringBuffer,
        ringBuffer.newBarrier(),
        new IgnoreExceptionHandler(),
        new ConsumerC(), new ConsumerD()
    );
    ringBuffer.addGatingSequences(workerPool.getWorkerSequences());
    Executor executor = Executors.newFixedThreadPool(4);
    workerPool.start(executor);
    for (int i = 0; i < 4; i++) {
        final int id = i;
        EventTranslator<TradeBO> translator = (event, seq) -> {
            event.setId(id);
            event.setPrice((double) id);
        };
        ringBuffer.publishEvent(translator);
        System.out.println("Publish[" + id + "]");
    }
}

DSL – Defining consumer dependencies

Consumers can be chained so that one runs after another, e.g.:

dw.consumeWith(handler1a, handler2a);
dw.after(handler1a).consumeWith(handler1b);
dw.after(handler2a).consumeWith(handler2b);
dw.after(handler1b, handler2b).consumeWith(handler3);
ProducerBarrier producerBarrier = dw.createProducerBarrier();
Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

Java concurrencyRingBufferHigh performance queueMulti-producerSingle-producer
Programmer DD
Written by

Programmer DD

A tinkering programmer and author of "Spring Cloud Microservices in Action"

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.