Inside RocketMQ ConsumeQueue: Design, File-Based Indexing, and RocksDB Optimization
This article provides an in‑depth technical exploration of RocketMQ 5.0's ConsumeQueue component, explaining why it is needed, its design principles, the traditional file‑based implementation, and a performance‑focused RocksDB‑based redesign, complete with code excerpts and implementation details.
Introduction
In RocketMQ 5.0, ConsumeQueue is the index structure that maps logical topic queues to physical positions in the CommitLog. It enables consumers to locate and pull messages efficiently from the massive, interleaved log.
Why ConsumeQueue Is Needed
All messages of all topics are appended to a single CommitLog file queue. Because messages of different topics are mixed, a consumer cannot read only the messages it subscribes to without an indexing layer. ConsumeQueue provides this index by storing a CqUnit for each message.
Design Principles
Each topic queue has a dedicated ConsumeQueue.
When a message is written to the CommitLog, an asynchronous task creates a CqUnit and appends it to the corresponding ConsumeQueue.
A CqUnit contains the logical queue offset, the physical offset in the CommitLog, the message size, and optionally a store timestamp.
Consumers record the current Offset of their consumer group; after a restart they resume consumption from that offset.
File‑Based Implementation
ConsumeQueue stores CqUnit objects in a file queue similar to CommitLog. Each file has a fixed size (default 1 GB) and holds a configurable number of entries (typically 300 000). The file name is the offset of the first CqUnit in that file, allowing fast location of a given offset.
Index entry definition (20 bytes per entry):
struct CqUnit {
long queueOffset; // logical offset in the ConsumeQueue
long commitLogPos; // physical position in CommitLog
int size; // message size
long storeTimestamp; // optional timestamp for time‑based lookup
}When a new message arrives, ReputMessageService reads it from the CommitLog, builds a DispatchRequest, and writes a CqUnit to the appropriate ConsumeQueue file. The current offset for each topic queue is maintained by QueueOffsetOperator under a ReentrantLock to avoid concurrent conflicts.
Iterating by Offset
public ReferredIterator<CqUnit> iterateFrom(long startOffset) {
SelectMappedBufferResult sbr = getIndexBuffer(startOffset);
if (sbr == null) return null;
return new ConsumeQueueIterator(sbr);
}The iterator reads CqUnit entries sequentially, extracts commitLogPos and size, and then fetches the actual message bytes from the CommitLog.
Finding the Correct File
public MappedFile findMappedFileByOffset(final long offset) {
if (offset < firstMappedFile.getFileFromOffset() ||
offset >= lastMappedFile.getFileFromOffset() + mappedFileSize) {
LOG.warn("Offset not matched ...");
return null;
}
int index = (int) ((offset / mappedFileSize) -
(firstMappedFile.getFileFromOffset() / mappedFileSize));
return mappedFiles.get(index);
}Timestamp Lookup
ConsumeQueue can locate the offset that corresponds to a given timestamp. The algorithm first finds the mapped file whose time range contains the timestamp, then performs a binary search inside that file to locate the nearest CqUnit.
Core Message Retrieval Logic
The broker’s DefaultMessageStore.getMessage method pulls a batch of messages from a topic queue. A simplified excerpt shows the key steps:
@Override
public GetMessageResult getMessage(final String group, final String topic,
final int queueId, final long offset,
final int maxMsgNums, final int maxTotalMsgSize,
final MessageFilter messageFilter) {
long beginTime = this.getSystemClock().now();
GetMessageStatus status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;
long nextBeginOffset = offset;
long minOffset = 0, maxOffset = 0;
GetMessageResult getResult = new GetMessageResult();
long maxOffsetPy = this.commitLog.getMaxOffset();
ConsumeQueueInterface consumeQueue = findConsumeQueue(topic, queueId);
if (consumeQueue != null) {
minOffset = consumeQueue.getMinOffsetInQueue();
maxOffset = consumeQueue.getMaxOffsetInQueue();
if (maxOffset > 0) {
int maxPullSize = Math.max(maxTotalMsgSize, 100);
if (maxPullSize > MAX_PULL_MSG_SIZE) {
LOG.warn("The max pull size is too large maxPullSize={} topic={} queueId={}",
maxPullSize, topic, queueId);
maxPullSize = MAX_PULL_MSG_SIZE;
}
status = GetMessageStatus.NO_MATCHED_MESSAGE;
while (getResult.getBufferTotalSize() <= 0 && nextBeginOffset < maxOffset) {
ReferredIterator<CqUnit> bufferConsumeQueue =
consumeQueue.iterateFrom(group, nextBeginOffset, maxMsgNums);
while (bufferConsumeQueue.hasNext() && nextBeginOffset < maxOffset) {
CqUnit cqUnit = bufferConsumeQueue.next();
long phyOffset = cqUnit.getPos();
int size = cqUnit.getSize();
SelectMappedBufferResult selectResult =
this.commitLog.getMessage(phyOffset, size);
getResult.addMessage(selectResult, cqUnit.getQueueOffset(), cqUnit.getBatchNum());
status = GetMessageStatus.FOUND;
}
bufferConsumeQueue.release();
}
}
} else {
status = GetMessageStatus.NO_MATCHED_LOGIC_QUEUE;
nextBeginOffset = nextOffsetCorrection(offset, 0);
}
getResult.setStatus(status);
getResult.setNextBeginOffset(nextBeginOffset);
getResult.setMaxOffset(maxOffset);
getResult.setMinOffset(minOffset);
return getResult;
}Key points:
The ConsumeQueue.iterateFrom method creates an iterator based on the start offset.
The iterator yields CqUnit objects in order, allowing sequential reads.
Each CqUnit provides the physical offset and size needed to fetch the message payload from the CommitLog.
RocksDB‑Based Optimization
When the number of topics grows, the file‑based approach incurs high I/O due to many small files and random reads. RocketMQ can store the index in RocksDB, a high‑performance key‑value store, reducing file count and improving random‑read performance.
Data Model
DefaultColumnFamily : stores CqUnit values. The key combines topic, queueId, and offset.
OffsetColumnFamily : stores the maxOffset and minOffset for each topic queue.
Key example (simplified):
byte[] key = buildCQKey(topicBytes, queueId, offset);Value example (includes timestamp):
byte[] value = serialize(new CqUnit(queueOffset, commitLogPos, size, storeTimestamp));Writing Index Entries
private boolean putMessagePosition0(List<DispatchRequest> requests) {
try (WriteBatch batch = new WriteBatch()) {
for (int i = requests.size() - 1; i >= 0; i--) {
DispatchRequest req = requests.get(i);
// build CqUnit and write to DefaultColumnFamily
// update max offset in OffsetColumnFamily
}
rocksDBStorage.batchPut(batch);
return true;
} catch (Exception e) {
LOG.error("putMessagePosition0 failed.", e);
return false;
}
}Iterating by Offset in RocksDB
public ReferredIterator<CqUnit> iterateFrom(String group, long startIndex, int count) throws RocksDBException {
long max = getMaxOffsetInQueue();
int num = Math.min((int)(max - startIndex), count);
if (useMultiGet(num)) {
return iterateUseMultiGet(startIndex, num);
} else {
return iterateUseScan(startIndex, num);
}
} iterateUseMultiGetbuilds a list of keys and calls RocksDB.multiGet to retrieve a batch of values; iterateUseScan creates a RocksIterator with an upper bound for sequential scanning.
Timestamp Lookup with RocksDB
Because each CqUnit stored in RocksDB already contains the store timestamp, the binary‑search algorithm can compare timestamps directly without reading the CommitLog, greatly improving lookup speed.
Range Query (MultiGet)
public List<ByteBuffer> rangeQuery(final String topic, final int queueId,
final long startIndex, final int num) throws RocksDBException {
byte[] topicBytes = topic.getBytes(StandardCharsets.UTF_8);
List<ColumnFamilyHandle> cfList = new ArrayList<>(num);
ByteBuffer[] resultList = new ByteBuffer[num];
List<Integer> kvIndexList = new ArrayList<>(num);
List<byte[]> kvKeyList = new ArrayList<>(num);
for (int i = 0; i < num; i++) {
ByteBuffer keyBB = buildCQKeyByteBuffer(topicBytes, queueId, startIndex + i);
kvIndexList.add(i);
kvKeyList.add(keyBB.array());
cfList.add(this.defaultCFH);
}
List<byte[]> kvValueList = this.rocksDBStorage.multiGet(cfList, kvKeyList);
for (int i = 0; i < kvValueList.size(); i++) {
byte[] value = kvValueList.get(i);
if (value != null) {
resultList[kvIndexList.get(i)] = ByteBuffer.wrap(value);
}
}
List<ByteBuffer> bbValueList = new ArrayList<>(resultList.length);
for (ByteBuffer buf : resultList) {
if (buf == null) break;
bbValueList.add(buf);
}
return bbValueList;
}Scan Iterator
public RocksIterator scanQuery(final String topic, final int queueId,
final long startIndex, ReadOptions readOptions) throws RocksDBException {
ByteBuffer beginKey = getSeekKey(topic, queueId, startIndex);
if (readOptions.iterateUpperBound() == null) {
ByteBuffer upperKey = getUpperKeyForInitScanner(topic, queueId);
byte[] buf = new byte[upperKey.remaining()];
upperKey.slice().get(buf);
readOptions.setIterateUpperBound(new Slice(buf));
}
RocksIterator iterator = this.rocksDBStorage.scan(readOptions);
iterator.seek(beginKey.slice());
return iterator;
}Timestamp Lookup in RocksDB
public long getOffsetInQueueByTime(String topic, int queueId, long timestamp,
BoundaryType boundaryType) throws RocksDBException {
long minPhysicOffset = this.messageStore.getMinPhyOffset();
long low = this.rocksDBConsumeQueueOffsetTable.getMinCqOffset(topic, queueId);
Long high = this.rocksDBConsumeQueueOffsetTable.getMaxCqOffset(topic, queueId);
if (high == null || high == -1) {
return 0;
}
return this.rocksDBConsumeQueueTable.binarySearchInCQByTime(topic, queueId,
high, low, timestamp,
minPhysicOffset, boundaryType);
}Since the timestamp is stored inside the CqUnit value, the binary search compares the target timestamp directly with the stored timestamps, avoiding an extra CommitLog read.
Benefits of the RocksDB Implementation
Reduces the number of ConsumeQueue files; index data is stored as KV pairs instead of per‑topic files.
Improves random‑read latency because RocksDB’s LSM‑tree layout is optimized for point lookups and range scans.
Enables efficient timestamp‑based queries without accessing the CommitLog.
Scales to millions of topic queues, which is difficult for the file‑based design.
Tencent Cloud Middleware
Official account of Tencent Cloud Middleware. Focuses on microservices, messaging middleware and other cloud‑native technology trends, publishing product updates, case studies, and technical insights. Regularly hosts tech salons to share effective solutions.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
