Inside RocketMQ ConsumeQueue: Design, File-Based Indexing, and RocksDB Optimization

This article provides an in‑depth technical exploration of RocketMQ 5.0's ConsumeQueue component, explaining why it is needed, its design principles, the traditional file‑based implementation, and a performance‑focused RocksDB‑based redesign, complete with code excerpts and implementation details.

Tencent Cloud Middleware
Tencent Cloud Middleware
Tencent Cloud Middleware
Inside RocketMQ ConsumeQueue: Design, File-Based Indexing, and RocksDB Optimization

Introduction

In RocketMQ 5.0, ConsumeQueue is the index structure that maps logical topic queues to physical positions in the CommitLog. It enables consumers to locate and pull messages efficiently from the massive, interleaved log.

Why ConsumeQueue Is Needed

All messages of all topics are appended to a single CommitLog file queue. Because messages of different topics are mixed, a consumer cannot read only the messages it subscribes to without an indexing layer. ConsumeQueue provides this index by storing a CqUnit for each message.

Design Principles

Each topic queue has a dedicated ConsumeQueue.

When a message is written to the CommitLog, an asynchronous task creates a CqUnit and appends it to the corresponding ConsumeQueue.

A CqUnit contains the logical queue offset, the physical offset in the CommitLog, the message size, and optionally a store timestamp.

Consumers record the current Offset of their consumer group; after a restart they resume consumption from that offset.

File‑Based Implementation

ConsumeQueue stores CqUnit objects in a file queue similar to CommitLog. Each file has a fixed size (default 1 GB) and holds a configurable number of entries (typically 300 000). The file name is the offset of the first CqUnit in that file, allowing fast location of a given offset.

Index entry definition (20 bytes per entry):

struct CqUnit {
    long queueOffset;   // logical offset in the ConsumeQueue
    long commitLogPos; // physical position in CommitLog
    int  size;         // message size
    long storeTimestamp; // optional timestamp for time‑based lookup
}

When a new message arrives, ReputMessageService reads it from the CommitLog, builds a DispatchRequest, and writes a CqUnit to the appropriate ConsumeQueue file. The current offset for each topic queue is maintained by QueueOffsetOperator under a ReentrantLock to avoid concurrent conflicts.

Iterating by Offset

public ReferredIterator<CqUnit> iterateFrom(long startOffset) {
    SelectMappedBufferResult sbr = getIndexBuffer(startOffset);
    if (sbr == null) return null;
    return new ConsumeQueueIterator(sbr);
}

The iterator reads CqUnit entries sequentially, extracts commitLogPos and size, and then fetches the actual message bytes from the CommitLog.

Finding the Correct File

public MappedFile findMappedFileByOffset(final long offset) {
    if (offset < firstMappedFile.getFileFromOffset() ||
        offset >= lastMappedFile.getFileFromOffset() + mappedFileSize) {
        LOG.warn("Offset not matched ...");
        return null;
    }
    int index = (int) ((offset / mappedFileSize) -
                     (firstMappedFile.getFileFromOffset() / mappedFileSize));
    return mappedFiles.get(index);
}

Timestamp Lookup

ConsumeQueue can locate the offset that corresponds to a given timestamp. The algorithm first finds the mapped file whose time range contains the timestamp, then performs a binary search inside that file to locate the nearest CqUnit.

Core Message Retrieval Logic

The broker’s DefaultMessageStore.getMessage method pulls a batch of messages from a topic queue. A simplified excerpt shows the key steps:

@Override
public GetMessageResult getMessage(final String group, final String topic,
                                   final int queueId, final long offset,
                                   final int maxMsgNums, final int maxTotalMsgSize,
                                   final MessageFilter messageFilter) {
    long beginTime = this.getSystemClock().now();
    GetMessageStatus status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;
    long nextBeginOffset = offset;
    long minOffset = 0, maxOffset = 0;
    GetMessageResult getResult = new GetMessageResult();
    long maxOffsetPy = this.commitLog.getMaxOffset();
    ConsumeQueueInterface consumeQueue = findConsumeQueue(topic, queueId);
    if (consumeQueue != null) {
        minOffset = consumeQueue.getMinOffsetInQueue();
        maxOffset = consumeQueue.getMaxOffsetInQueue();
        if (maxOffset > 0) {
            int maxPullSize = Math.max(maxTotalMsgSize, 100);
            if (maxPullSize > MAX_PULL_MSG_SIZE) {
                LOG.warn("The max pull size is too large maxPullSize={} topic={} queueId={}",
                         maxPullSize, topic, queueId);
                maxPullSize = MAX_PULL_MSG_SIZE;
            }
            status = GetMessageStatus.NO_MATCHED_MESSAGE;
            while (getResult.getBufferTotalSize() <= 0 && nextBeginOffset < maxOffset) {
                ReferredIterator<CqUnit> bufferConsumeQueue =
                        consumeQueue.iterateFrom(group, nextBeginOffset, maxMsgNums);
                while (bufferConsumeQueue.hasNext() && nextBeginOffset < maxOffset) {
                    CqUnit cqUnit = bufferConsumeQueue.next();
                    long phyOffset = cqUnit.getPos();
                    int size = cqUnit.getSize();
                    SelectMappedBufferResult selectResult =
                            this.commitLog.getMessage(phyOffset, size);
                    getResult.addMessage(selectResult, cqUnit.getQueueOffset(), cqUnit.getBatchNum());
                    status = GetMessageStatus.FOUND;
                }
                bufferConsumeQueue.release();
            }
        }
    } else {
        status = GetMessageStatus.NO_MATCHED_LOGIC_QUEUE;
        nextBeginOffset = nextOffsetCorrection(offset, 0);
    }
    getResult.setStatus(status);
    getResult.setNextBeginOffset(nextBeginOffset);
    getResult.setMaxOffset(maxOffset);
    getResult.setMinOffset(minOffset);
    return getResult;
}

Key points:

The ConsumeQueue.iterateFrom method creates an iterator based on the start offset.

The iterator yields CqUnit objects in order, allowing sequential reads.

Each CqUnit provides the physical offset and size needed to fetch the message payload from the CommitLog.

RocksDB‑Based Optimization

When the number of topics grows, the file‑based approach incurs high I/O due to many small files and random reads. RocketMQ can store the index in RocksDB, a high‑performance key‑value store, reducing file count and improving random‑read performance.

Data Model

DefaultColumnFamily : stores CqUnit values. The key combines topic, queueId, and offset.

OffsetColumnFamily : stores the maxOffset and minOffset for each topic queue.

Key example (simplified):

byte[] key = buildCQKey(topicBytes, queueId, offset);

Value example (includes timestamp):

byte[] value = serialize(new CqUnit(queueOffset, commitLogPos, size, storeTimestamp));

Writing Index Entries

private boolean putMessagePosition0(List<DispatchRequest> requests) {
    try (WriteBatch batch = new WriteBatch()) {
        for (int i = requests.size() - 1; i >= 0; i--) {
            DispatchRequest req = requests.get(i);
            // build CqUnit and write to DefaultColumnFamily
            // update max offset in OffsetColumnFamily
        }
        rocksDBStorage.batchPut(batch);
        return true;
    } catch (Exception e) {
        LOG.error("putMessagePosition0 failed.", e);
        return false;
    }
}

Iterating by Offset in RocksDB

public ReferredIterator<CqUnit> iterateFrom(String group, long startIndex, int count) throws RocksDBException {
    long max = getMaxOffsetInQueue();
    int num = Math.min((int)(max - startIndex), count);
    if (useMultiGet(num)) {
        return iterateUseMultiGet(startIndex, num);
    } else {
        return iterateUseScan(startIndex, num);
    }
}
iterateUseMultiGet

builds a list of keys and calls RocksDB.multiGet to retrieve a batch of values; iterateUseScan creates a RocksIterator with an upper bound for sequential scanning.

Timestamp Lookup with RocksDB

Because each CqUnit stored in RocksDB already contains the store timestamp, the binary‑search algorithm can compare timestamps directly without reading the CommitLog, greatly improving lookup speed.

Range Query (MultiGet)

public List<ByteBuffer> rangeQuery(final String topic, final int queueId,
                                 final long startIndex, final int num) throws RocksDBException {
    byte[] topicBytes = topic.getBytes(StandardCharsets.UTF_8);
    List<ColumnFamilyHandle> cfList = new ArrayList<>(num);
    ByteBuffer[] resultList = new ByteBuffer[num];
    List<Integer> kvIndexList = new ArrayList<>(num);
    List<byte[]> kvKeyList = new ArrayList<>(num);
    for (int i = 0; i < num; i++) {
        ByteBuffer keyBB = buildCQKeyByteBuffer(topicBytes, queueId, startIndex + i);
        kvIndexList.add(i);
        kvKeyList.add(keyBB.array());
        cfList.add(this.defaultCFH);
    }
    List<byte[]> kvValueList = this.rocksDBStorage.multiGet(cfList, kvKeyList);
    for (int i = 0; i < kvValueList.size(); i++) {
        byte[] value = kvValueList.get(i);
        if (value != null) {
            resultList[kvIndexList.get(i)] = ByteBuffer.wrap(value);
        }
    }
    List<ByteBuffer> bbValueList = new ArrayList<>(resultList.length);
    for (ByteBuffer buf : resultList) {
        if (buf == null) break;
        bbValueList.add(buf);
    }
    return bbValueList;
}

Scan Iterator

public RocksIterator scanQuery(final String topic, final int queueId,
                               final long startIndex, ReadOptions readOptions) throws RocksDBException {
    ByteBuffer beginKey = getSeekKey(topic, queueId, startIndex);
    if (readOptions.iterateUpperBound() == null) {
        ByteBuffer upperKey = getUpperKeyForInitScanner(topic, queueId);
        byte[] buf = new byte[upperKey.remaining()];
        upperKey.slice().get(buf);
        readOptions.setIterateUpperBound(new Slice(buf));
    }
    RocksIterator iterator = this.rocksDBStorage.scan(readOptions);
    iterator.seek(beginKey.slice());
    return iterator;
}

Timestamp Lookup in RocksDB

public long getOffsetInQueueByTime(String topic, int queueId, long timestamp,
                                   BoundaryType boundaryType) throws RocksDBException {
    long minPhysicOffset = this.messageStore.getMinPhyOffset();
    long low = this.rocksDBConsumeQueueOffsetTable.getMinCqOffset(topic, queueId);
    Long high = this.rocksDBConsumeQueueOffsetTable.getMaxCqOffset(topic, queueId);
    if (high == null || high == -1) {
        return 0;
    }
    return this.rocksDBConsumeQueueTable.binarySearchInCQByTime(topic, queueId,
                                                               high, low, timestamp,
                                                               minPhysicOffset, boundaryType);
}

Since the timestamp is stored inside the CqUnit value, the binary search compares the target timestamp directly with the stored timestamps, avoiding an extra CommitLog read.

Benefits of the RocksDB Implementation

Reduces the number of ConsumeQueue files; index data is stored as KV pairs instead of per‑topic files.

Improves random‑read latency because RocksDB’s LSM‑tree layout is optimized for point lookups and range scans.

Enables efficient timestamp‑based queries without accessing the CommitLog.

Scales to millions of topic queues, which is difficult for the file‑based design.

backendIndexingDistributedSystemsRocketMQRocksDBMessageQueueConsumeQueue
Tencent Cloud Middleware
Written by

Tencent Cloud Middleware

Official account of Tencent Cloud Middleware. Focuses on microservices, messaging middleware and other cloud‑native technology trends, publishing product updates, case studies, and technical insights. Regularly hosts tech salons to share effective solutions.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.