How I Reengineered EQueue to Use File-Based Persistence for Faster Messaging

The article details the author's multi‑month effort to replace SQL Server persistence in the open‑source EQueue message queue with a high‑performance local file storage solution, covering the motivations, design considerations, chunked binary file format, write‑and‑read mechanisms, and flushing strategies.

21CTO
21CTO
21CTO
How I Reengineered EQueue to Use File-Based Persistence for Faster Messaging

Why switch to file storage?

The original EQueue persisted messages in SQL Server using asynchronous bulk inserts, which worked functionally but caused bandwidth, CPU, and memory pressure when the broker and database were on separate machines, and suffered instability under high load.

Problems with in‑memory GC

Storing recent messages in a large ConcurrentDictionary<long, Message> avoided frequent DB reads but led to massive GC pressure when the dictionary grew to millions of entries, blocking threads and reducing TPS.

Key design of file storage

To address these issues the author adopted two strategies: (1) persist messages by writing them to files, and (2) cache messages in unmanaged memory. The file design uses binary chunks with three states (New, Completed, Ongoing) and stores each message as a length prefix followed by the payload, enabling efficient forward and backward reads.

Queue metadata is also stored in fixed‑size 8‑byte records (MessagePosition) so the offset of a message in a queue can be calculated directly without a large in‑memory dictionary.

Writing a message to file

MessageStoreResult result = null;
lock (_syncObj)
{
    var queueOffset = queue.NextOffset;
    var messageRecord = _messageStore.StoreMessage(queueId, queueOffset, message);
    queue.AddMessage(messageRecord.LogPosition, message.Tag);
    queue.IncrementNextOffset();
    result = new MessageStoreResult(messageRecord.MessageId, message.Code, message.Topic, queueId, queueOffset, message.Tag);
}

The StoreMessage method creates a MessageLogRecord and writes it via ChunkWriter:

public MessageLogRecord StoreMessage(int queueId, long queueOffset, Message message)
{
    var record = new MessageLogRecord(
        message.Topic,
        message.Code,
        message.Body,
        queueId,
        queueOffset,
        message.CreatedTime,
        DateTime.Now,
        message.Tag);
    _chunkWriter.Write(record);
    return record;
}

The queue index is added with:

public void AddMessage(long messagePosition, string messageTag)
{
    _chunkWriter.Write(new QueueLogRecord(messagePosition + 1, messageTag.GetHashcode2()));
}

The ChunkWriter.Write method handles chunk rotation, synchronization, and optional flushing:

public long Write(ILogRecord record)
{
    lock (_lockObj)
    {
        if (_isClosed) throw new ChunkWriteException(_currentChunk.ToString(), "Chunk writer is closed.");
        if (_currentChunk.IsCompleted) _currentChunk = _chunkManager.AddNewChunk();
        var result = _currentChunk.TryAppend(record);
        if (!result.Success)
        {
            _currentChunk.Complete();
            _currentChunk = _chunkManager.AddNewChunk();
            result = _currentChunk.TryAppend(record);
            if (!result.Success) throw new ChunkWriteException(_currentChunk.ToString(), "Write record to chunk failed.");
        }
        if (_chunkManager.Config.SyncFlush) _currentChunk.Flush();
        return result.Position;
    }
}

Reading messages from files

To support concurrent reads, a pool of pre‑created ReaderWorkItem objects is maintained. When a read is needed, a reader is dequeued, used, and then returned to the pool, avoiding repeated stream creation and reducing GC pressure.

private readonly ConcurrentQueue<ReaderWorkItem> _readerWorkItemQueue = new ConcurrentQueue<ReaderWorkItem>();
private void InitializeReaderWorkItems()
{
    for (var i = 0; i < _chunkConfig.ChunkReaderCount; i++)
        _readerWorkItemQueue.Enqueue(CreateReaderWorkItem());
    _isReadersInitialized = true;
}
private ReaderWorkItem CreateReaderWorkItem()
{
    var stream = _isMemoryChunk
        ? new UnmanagedMemoryStream((byte*)_cachedData, _cachedLength)
        : new FileStream(_filename, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, _chunkConfig.ChunkReadBuffer, FileOptions.None);
    return new ReaderWorkItem(stream, new BinaryReader(stream));
}
private ReaderWorkItem GetReaderWorkItem()
{
    ReaderWorkItem readerWorkItem;
    while (!_readerWorkItemQueue.TryDequeue(out readerWorkItem)) Thread.Sleep(1);
    return readerWorkItem;
}
private void ReturnReaderWorkItem(ReaderWorkItem readerWorkItem)
{
    _readerWorkItemQueue.Enqueue(readerWorkItem);
}

Flushing strategy

After writing, data can be flushed synchronously (calling Flush on the file stream) or asynchronously via a background thread that periodically flushes, balancing throughput and durability according to configuration.

Overall, the article walks through the motivations, binary chunk design, write and read implementations, and flushing options that enabled the author to replace SQL Server with efficient file‑based persistence in EQueue.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

Backend DevelopmentCMessage QueueEQueuefile persistence
21CTO
Written by

21CTO

21CTO (21CTO.com) offers developers community, training, and services, making it your go‑to learning and service platform.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.