How RocketMQ Stores Messages: Deep Dive into CommitLog, ConsumeQueue, and IndexFile

This article provides a comprehensive technical analysis of Apache RocketMQ's storage subsystem, detailing the architecture of CommitLog, ConsumeQueue, and IndexFile, the use of PageCache and mmap, the message write path, flush strategies, and read mechanisms, complete with code excerpts and diagrams.

vivo Internet Technology
vivo Internet Technology
vivo Internet Technology
How RocketMQ Stores Messages: Deep Dive into CommitLog, ConsumeQueue, and IndexFile

Storage Architecture

RocketMQ stores all topics in a single CommitLog file. Two auxiliary indexes, ConsumeQueue and IndexFile , provide fast lookup.

RocketMQ storage architecture diagram
RocketMQ storage architecture diagram

CommitLog : Raw message body and metadata. Files are 1 GB by default and named with a 20‑digit zero‑padded offset (e.g., 00000000000000000000 for the first file).

ConsumeQueue : Fixed‑length index (20 bytes per entry) storing the physical offset in CommitLog, message size, and tag hashcode. Each file holds ~300 k entries (~5.72 MB).

IndexFile : Key‑based and time‑range lookup. Each file is ~400 MB and can store ~20 million index entries, using a structure similar to Java HashMap.

PageCache and mmap

PageCache : Kernel cache that buffers file I/O. Writes first land in PageCache and are flushed to disk asynchronously.

mmap : Maps a file directly into the process address space, eliminating copies between kernel and user buffers. RocketMQ uses Java NIO FileChannel.map() for this purpose.

Message Write Path

The core write flow is implemented in CommitLog.asyncPutMessage. It performs three logical steps: acquire a MappedFile, append the message to the buffer, and submit a flush request.

public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
    putMessageLock.lock(); // spin or ReentrantLock depending on config
    try {
        MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
        AppendMessageResult result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
        CompletableFuture<PutMessageStatus> flushResult = submitFlushRequest(result, msg);
    } finally {
        putMessageLock.unlock();
    }
    return ...;
}

MappedFile Initialization

An AllocateMappedFileService thread pre‑creates the next file to avoid latency.

public MappedFile putRequestAndReturnMappedFile(String nextFilePath, String nextNextFilePath, int fileSize) {
    AllocateRequest nextReq = new AllocateRequest(nextFilePath, fileSize);
    requestTable.putIfAbsent(nextFilePath, nextReq);
    AllocateRequest nextNextReq = new AllocateRequest(nextNextFilePath, fileSize);
    requestTable.putIfAbsent(nextNextFilePath, nextNextReq);
    return requestTable.get(nextFilePath).getMappedFile();
}

File Mapping

private void init(final String fileName, final int fileSize) throws IOException {
    this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
    this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
}

File Warm‑up

RocketMQ writes zero bytes across the whole file to force the OS to allocate physical pages and optionally calls mlock / madvise to prevent swapping.

public void warmMappedFile(FlushDiskType type, int pages) {
    ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
    for (int i = 0; i < this.fileSize; i += OS_PAGE_SIZE) {
        byteBuffer.put(i, (byte)0);
        if (type == FlushDiskType.SYNC_FLUSH && (i / OS_PAGE_SIZE - flush / OS_PAGE_SIZE) >= pages) {
            flush = i;
            mappedByteBuffer.force();
        }
        if (i % 1000 == 0) Thread.sleep(0);
    }
    mappedByteBuffer.force();
    this.mlock();
}

ConsumeQueue and IndexFile Construction

A background ReputMessageService thread reads newly appended entries from CommitLog and updates the auxiliary indexes.

while (!this.isStopped()) {
    Thread.sleep(1);
    doReput();
}

private void doReput() {
    DispatchRequest dr = commitLog.checkMessageAndReturnSize(...);
    if (dr.isSuccess()) {
        doDispatch(dr); // builds ConsumeQueue and IndexFile
    }
}

ConsumeQueue

Each entry is 20 bytes: 8‑byte CommitLog offset, 4‑byte size, 8‑byte tag hashcode.

private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode, final long cqOffset) {
    byteBufferIndex.putLong(offset);
    byteBufferIndex.putInt(size);
    byteBufferIndex.putLong(tagsCode);
    MappedFile mappedFile = mappedFileQueue.getLastMappedFile(expectLogicOffset);
    return mappedFile.appendMessage(byteBufferIndex.array());
}

IndexFile

IndexFile mimics a HashMap with a header, slot table, and a linked list of index items.

public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {
    int keyHash = indexKeyHashMethod(key);
    int slotPos = keyHash % this.hashSlotNum;
    int absSlotPos = INDEX_HEADER_SIZE + slotPos * hashSlotSize;
    int slotValue = mappedByteBuffer.getInt(absSlotPos);
    int absIndexPos = INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize + this.indexHeader.getIndexCount() * indexSize;
    mappedByteBuffer.putInt(absIndexPos, keyHash);
    mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);
    mappedByteBuffer.putInt(absIndexPos + 12, (int)(storeTimestamp - this.indexHeader.getBeginTimestamp()) / 1000);
    mappedByteBuffer.putInt(absIndexPos + 16, slotValue); // link to previous
    mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());
    // update header fields (begin/end timestamps, offsets, counts)
    return true;
}

Flush Strategies

Synchronous flush : The broker acknowledges the producer only after the data is persisted to disk.

Asynchronous flush : The broker returns ACK as soon as the data reaches PageCache; a background thread later forces the data to disk.

Flush requests are submitted from CommitLog.submitFlushRequest and handled by three services:

GroupCommitService : Handles synchronous flushes; the producer thread blocks until the flush completes.

FlushRealTimeService : Asynchronous flush without off‑heap memory; wakes up periodically or on demand.

CommitRealTimeService : Asynchronous flush with off‑heap (transient) memory; first moves data to off‑heap, then to PageCache, and finally to disk.

public CompletableFuture<PutMessageStatus> submitFlushRequest(AppendMessageResult result, MessageExt messageExt) {
    if (FlushDiskType.SYNC_FLUSH == config.getFlushDiskType()) {
        GroupCommitRequest req = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(), config.getSyncFlushTimeout());
        groupCommitService.putRequest(req);
        return req.future();
    } else {
        if (!config.isTransientStorePoolEnable()) flushCommitLogService.wakeup();
        else commitLogService.wakeup();
        return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
    }
}

Message Read Path

Read by Offset

The broker first locates the physical offset in ConsumeQueue , then fetches the payload from CommitLog .

public GetMessageResult getMessage(String group, String topic, int queueId, long offset, int maxMsgNums, MessageFilter filter) {
    ConsumeQueue cq = findConsumeQueue(topic, queueId);
    SelectMappedBufferResult buffer = cq.getIndexBuffer(offset);
    for (int i = 0; i < buffer.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
        long phyOffset = buffer.getByteBuffer().getLong();
        int size = buffer.getByteBuffer().getInt();
        SelectMappedBufferResult msg = commitLog.getMessage(phyOffset, size);
        result.addMessage(msg);
    }
    return result;
}

Read by Key

The IndexFile is consulted to obtain the physical offsets, after which the messages are retrieved from CommitLog.

public QueryMessageResult queryMessage(String topic, String key, int maxNum, long begin, long end) {
    QueryOffsetResult offsetResult = indexService.queryOffset(topic, key, maxNum, begin, end);
    for (Long offset : offsetResult.getPhyOffsets()) {
        SelectMappedBufferResult result = commitLog.getData(offset, false);
        queryResult.addMessage(result);
    }
    return queryResult;
}

Key Implementation Details

All topics share a single CommitLog, guaranteeing strict sequential writes.

MappedFile pre‑allocation and warm‑up reduce file‑creation latency and avoid page‑fault penalties.

When transient (off‑heap) store pool is enabled, writes go to a DirectByteBuffer; the data is later committed to PageCache by CommitRealTimeService.

ConsumeQueue provides O(1) random access to message offsets, while IndexFile enables fast key‑based and time‑range queries.

Synchronous flush offers strong durability at the cost of latency; asynchronous flush maximizes throughput for latency‑tolerant workloads.

References

RocketMQ official documentation: https://github.com/apache/rocketmq/tree/master/docs/cn

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

JavaMessage QueueRocketMQstorage architectureCommitLogConsumeQueueIndexFile
vivo Internet Technology
Written by

vivo Internet Technology

Sharing practical vivo Internet technology insights and salon events, plus the latest industry news and hot conferences.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.