Optimizing Broker Restarts and Minimizing File Reads in EQueue
This article explains how EQueue handles broker restarts by scanning and initializing chunk files, introduces memory‑based caching strategies to avoid frequent file reads, and outlines message deletion, querying, consumer offset storage, and queue management techniques for high‑performance backend systems.
How to handle broker restart?
When a broker restarts, EQueue scans all chunk files under the Chunk directory. Files are named sequentially (e.g., message-chunk-000000000, message-chunk-000000001) and sorted in ascending order. All files before the last one are in a Completed state and can be initialized by reading only their ChunkHeader. The last file may still be writing; its current write position is determined using a checkpoint mechanism or, alternatively, by reading from the file start until an invalid length/value is encountered.
How to minimize file reads?
To avoid reading from disk, EQueue caches completed chunk files in unmanaged memory using Marshal.AllocHGlobal and accesses them via UnmanagedMemoryStream. The following method loads a chunk file into unmanaged memory:
private void LoadFileChunkToMemory()
{
using (var fileStream = new FileStream(_filename, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, 8192, FileOptions.None))
{
var cachedLength = (int)fileStream.Length;
var cachedData = Marshal.AllocHGlobal(cachedLength);
try
{
using (var unmanagedStream = new UnmanagedMemoryStream((byte*)cachedData, cachedLength, cachedLength, FileAccess.ReadWrite))
{
fileStream.Seek(0, SeekOrigin.Begin);
var buffer = new byte[65536];
int toRead = cachedLength;
while (toRead > 0)
{
int read = fileStream.Read(buffer, 0, Math.Min(toRead, buffer.Length));
if (read == 0) break;
toRead -= read;
unmanagedStream.Write(buffer, 0, read);
}
}
}
catch
{
Marshal.FreeHGlobal(cachedData);
throw;
}
_cachedData = cachedData;
_cachedLength = cachedLength;
}
}For newly created chunks, a matching unmanaged memory chunk is allocated and kept in sync with file writes. When reading, the system first checks whether the current chunk has an associated unmanaged memory buffer; if so, it reads from memory, otherwise it falls back to disk.
Because data may not be flushed to disk immediately, recent writes might be invisible to readers. To handle this, the latest N messages (default 10,000) are kept in a managed circular buffer. The buffer index is calculated as dataPosition % bufferSize, and each entry stores the position and the raw bytes. When a read request arrives, the buffer is consulted first; if the entry matches the requested position, the message is returned without disk access.
How to delete messages?
Messages are deleted only after all consumers have processed them. EQueue offers two deletion strategies for Message Chunks: (1) limit the number of chunk files (default 100, each 256 MB ≈ 25 GB total) and (2) time‑based retention (default 7 days). Queue Chunks are deleted when all their associated Message Chunks have been removed.
How to query messages?
Each message ID consists of the broker IP and the global position of the message within the broker’s files. Knowing the ID allows the system to locate the exact file and offset, enabling direct retrieval without a database.
How to persist consumer offsets?
Consumer offsets are stored in a single JSON file mapping each consumer group and queue to its latest offset, e.g.:
{"SampleGroup":{"topic1-3":89055,"topic1-2":89599,"topic1-1":89471,"topic1-0":89695}}Further optimization ideas
Since file storage already provides binary data, EQueue can return the raw bytes directly to consumers, eliminating the need to deserialize from a database and re‑serialize, thus removing a performance bottleneck.
How to monitor read/write statistics?
EQueue’s Chunk statistics service prints per‑chunk read/write metrics every second, helping operators detect latency or throughput issues.
Dynamic queue scaling and safety
Queue expansion is straightforward; deletion requires ensuring no producers or consumers are still visible to the queue, waiting for all messages to be consumed, and then removing the queue’s files and metadata. Sample deletion code:
public void DeleteQueue(string topic, int queueId)
{
lock (this)
{
var key = QueueKeyUtil.CreateQueueKey(topic, queueId);
if (!_queueDict.TryGetValue(key, out var queue)) return;
if (queue.Setting.ProducerVisible || queue.Setting.ConsumerVisible)
throw new Exception("Queue is visible to producer or consumer, cannot be delete.");
var minConsumedOffset = _consumeOffsetStore.GetMinConsumedOffset(topic, queueId);
var queueCurrentOffset = queue.NextOffset - 1;
if (minConsumedOffset < queueCurrentOffset)
throw new Exception(string.Format("Queue is not allowed to delete, there are not consumed messages: {0}",
queueCurrentOffset - minConsumedOffset));
_consumeOffsetStore.DeleteConsumeOffset(queue.Key);
queue.Delete();
_queueDict.Remove(key);
}
}Tag support
EQueue now supports tags as a second‑level filter on topics, allowing consumers to subscribe to a specific tag (e.g., "02") and receive only matching messages, reducing the need for many separate topics.
Message backlog alerts
The web console can send email or SMS alerts when the number of unconsumed messages exceeds a configurable threshold, enabling rapid response to potential bottlenecks.
21CTO
21CTO (21CTO.com) offers developers community, training, and services, making it your go‑to learning and service platform.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
