Implementing an LSM‑Tree in Zig: Core Components, Write/Read Logic, and Compaction
The article walks through a complete Zig implementation of an LSM‑Tree, detailing its in‑memory skip‑list MemTable, immutable SSTable blocks with compression and Bloom filters, write‑ahead logging, iterator hierarchy for reads, and multi‑level compaction logic that merges and rewrites SSTables.
LSM‑Tree (Log‑Structured Merge Tree) is a high‑performance key‑value storage structure widely used in NoSQL databases and big‑data systems. This article describes a complete implementation of the core LSM‑Tree functionality (read/write, compression, persistence) in the Zig programming language.
1. Introduction
The LSM‑Tree optimizes write throughput by using sequential disk writes and a multi‑level hierarchy of immutable SSTables, at the cost of slightly higher read latency. The author builds a personal implementation to deepen understanding.
2. Core Function Overview
The implementation covers the following components:
MemTable (in‑memory balanced tree, implemented with a skip‑list)
SSTable (immutable on‑disk file composed of blocks)
Write‑ahead log (WAL) for durability
Iterators for reading (MemTable, SSTable, MergeIterator, TwoMergeIterator)
Compaction logic to merge levels
3. MemTable Implementation
The MemTable uses a generic skip‑list. The relevant source file is src/skiplist.zig :
map: Map,
lock: RwLock,
wal: ?Wal,
id: usize,
allocator: std.mem.Allocator,
arena: std.heap.ArenaAllocator,
approximate_size: atomic.Value(usize) = atomic.Value(usize).init(0),
fn putToList(self: *Self, key: []const u8, value: []const u8) !void {
self.lock.lock();
defer self.lock.unlock();
try self.map.insert(key, value);
}
fn putToWal(self: *Self, key: []const u8, value: []const u8) !void {
if (self.wal) |w| {
var buf = std.ArrayList(u8).init(self.arena.allocator());
var bw = buf.writer();
try bw.writeInt(u32, @intCast(key.len), .big);
_ = try bw.write(key);
try bw.writeInt(u32, @intCast(value.len), .big);
_ = try bw.write(value);
try w.append(buf.items);
}
}
pub fn put(self: *Self, key: []const u8, value: []const u8) !void {
try self.putToWal(key, value);
try self.putToList(key, value);
}
pub fn get(self: *Self, key: []const u8, val: *[]const u8) !bool {
self.lock.lockShared();
defer self.lock.unlockShared();
var vv: []const u8 = undefined;
if (try self.map.get(key, &vv)) {
val.* = vv;
return true;
}
return false;
}Deletion is represented by writing an empty value (tombstone) via put(key, "") .
4. SSTable and Block Implementation
Blocks store a sequence of key/value pairs with prefix compression. The block builder is defined in src/block.zig :
pub const Block = struct {
data_v: std.ArrayList(u8),
offset_v: std.ArrayList(u16),
};
pub const BlockBuilder = struct {
allocator: std.mem.Allocator,
offset_v: std.ArrayList(u16),
data_v: std.ArrayList(u8),
block_size: usize,
first_key: []u8,
pub fn add(self: *Self, key: []const u8, value: ?[]const u8) !bool {
std.debug.assert(key.len > 0);
const vSize = if (value) |v| v.len else 0;
if ((self.estimated_size() + key.len + vSize + 3 * @sizeOf(u16) > self.block_size) and !self.is_empty()) {
return false;
}
try self.doAdd(key, value);
if (self.first_key.len == 0) {
self.first_key = try self.allocator.dupe(u8, key);
}
return true;
}
fn doAdd(self: *Self, key: []const u8, value: ?[]const u8) !void {
try self.offset_v.append(@intCast(self.data_v.items.len));
const overlap = calculate_overlap(self.first_key, key);
var dw = self.data_v.writer();
try dw.writeInt(u16, @intCast(overlap), .big);
try dw.writeInt(u16, @intCast(key.len - overlap), .big);
_ = try dw.write(key[overlap..]);
if (value) |v| {
try dw.writeInt(u16, @intCast(v.len), .big);
_ = try dw.write(v);
} else {
try dw.writeInt(u16, 0, .big);
}
}
pub fn build(self: *Self) !Block {
if (self.isEmpty()) {
@panic("block is empty");
}
return Block.init(try self.data_v.clone(), try self.offset_v.clone());
}
};The SSTable builder ( src/ss_table.zig ) creates a file consisting of multiple blocks, block metadata, and a Bloom filter:
pub const SsTableBuilder = struct {
allocator: std.mem.Allocator,
builder: BlockBuilder,
first_key: ?[]const u8,
last_key: ?[]const u8,
meta: std.ArrayList(BlockMeta),
block_size: usize,
data: std.ArrayList(u8),
bloom: BloomFilterPtr,
pub fn add(self: *Self, key: []const u8, value: []const u8) !void {
try self.setFirstKey(key);
try self.bloom.get().insert(key);
if (try self.builder.add(key, value)) {
try self.setLastKey(key);
return;
}
try self.finishBlock();
std.debug.assert(try self.builder.add(key, value));
try self.resetFirstKey(key);
try self.setLastKey(key);
}
fn finishBlock(self: *Self) !void {
if (self.builder.isEmpty()) return;
var bo = self.builder;
defer bo.reset();
self.builder = BlockBuilder.init(self.allocator, self.block_size);
var blk = try bo.build();
defer blk.deinit();
const encoded_block = try blk.encode(self.allocator);
defer self.allocator.free(encoded_block);
try self.meta.append(.{ .allocator = self.allocator, .offset = self.data.items.len, .first_key = try self.allocator.dupe(u8, self.first_key.?), .last_key = try self.allocator.dupe(u8, self.last_key.?) });
const cksm = hash.Crc32.hash(encoded_block);
try self.data.appendSlice(encoded_block);
try self.data.writer().writeInt(u32, cksm, .big);
}
pub fn build(self: *Self, id: usize, block_cache: ?BlockCachePtr, path: []const u8) !SsTable {
var arena = std.heap.ArenaAllocator.init(self.allocator);
defer arena.deinit();
const allocator = arena.allocator();
try self.finishBlock();
const w = self.data.writer();
const meta_offset = self.data.items.len;
const meta_b = try BlockMeta.batchEncode(self.meta.items, allocator);
_ = try w.write(meta_b);
try w.writeInt(u32, @intCast(meta_offset), .big);
const bloom_offset = self.data.items.len;
const encoded_bloom = try self.bloom.get().encode(allocator);
_ = try w.write(encoded_bloom);
try w.writeInt(u32, @intCast(bloom_offset), .big);
const file = try FileObject.init(path, self.data.items);
errdefer file.deinit();
const fk = self.meta.items[0].first_key;
const lk = self.meta.getLast().last_key;
return .{ .allocator = self.allocator, .file = file, .block_metas = try self.meta.toOwnedSlice(), .meta_offset = meta_offset, .block_cache = block_cache, .bloom = self.bloom.clone(), .id = id, .first_key = try self.allocator.dupe(u8, fk), .last_key = try self.allocator.dupe(u8, lk), .max_ts = 0 };
}
};5. Write Path
Writes go through a WAL, then into the MemTable (a skip‑list). When the MemTable reaches a size threshold it is frozen into an immutable MemTable and later flushed to an SSTable.
pub fn writeBatch(self: *Self, records: []const WriteBatchRecord) !void {
for (records) |record| {
switch (record) {
.put => |pp| { try self.state.getMemTable().put(pp.key, pp.value); },
.delete => |dd| { try self.state.getMemTable().put(dd, ""); },
}
try self.tryFreeze(self.state.getMemTable().getApproximateSize());
}
}
fn forceFreezeMemtable(self: *Self) !void {
const next_sst_id = self.getNextSstId();
var new_mm: MemTable = undefined;
if (self.options.enable_wal) {
const mm_path = try pathOfWal(self.allocator, self.path, next_sst_id);
defer self.allocator.free(mm_path);
new_mm = MemTable.init(next_sst_id, self.allocator, mm_path);
} else {
new_mm = MemTable.init(next_sst_id, self.allocator, null);
}
errdefer new_mm.deinit();
var old_mm: *MemTable = undefined;
self.state_lock.lock();
defer self.state_lock.unlock();
var old_mm_ptr = self.state.mem_table;
old_mm = old_mm_ptr.get();
defer old_mm_ptr.release();
self.state.mem_table = try MemTablePtr.create(self.allocator, new_mm);
try self.state.imm_mem_tables.append(old_mm_ptr.clone());
try old_mm.syncWal();
}
pub fn flushNextMemtable(self: *Self) !void {
std.debug.assert(self.state.imm_mem_tables.items.len > 0);
var to_flush_table: *MemTable = undefined;
self.state_lock.lockShared();
defer self.state_lock.unlockShared();
to_flush_table = self.state.imm_mem_tables.items[0].load();
var builder = try SsTableBuilder.init(self.allocator, self.options.block_size);
defer builder.deinit();
const sst_id = to_flush_table.id;
try to_flush_table.flush(&builder);
const sst_path = try self.pathOfSst(sst_id);
defer self.allocator.free(sst_path);
var sst = try builder.build(sst_id, self.block_cache.clone(), sst_path);
errdefer sst.deinit();
{
self.state_lock.lock();
defer self.state_lock.unlock();
var m = self.state.imm_mem_tables.orderedRemove(0);
defer m.deinit();
std.debug.assert(m.load().id == sst_id);
try self.state.l0_sstables.append(sst_id);
try self.state.sstables.put(sst.id, try SsTablePtr.create(self.allocator, sst));
}
}6. Read Path and Iterators
Reading involves merging results from the current MemTable, immutable MemTables, and multiple SSTable levels. The system defines a generic iterator interface (shown in Rust‑style for illustration):
pub trait StorageIterator {
/// Get the current value.
fn value(&self) -> &[u8];
/// Get the current key.
fn key(&self) -> &[u8];
/// Check if the iterator is empty.
fn is_empty(&self) -> bool;
/// Move to the next entry.
fn next(&mut self) -> anyhow::Result<()>;
fn num_active_iterators(&self) -> usize { 1 }
}Specific iterators for MemTable, SSTable, and Block implement this trait. Example: SSTable iterator:
pub const SsTableIterator = struct {
allocator: std.mem.Allocator,
table: SsTablePtr,
blk: BlockPtr,
blk_iterator: BlockIteratorPtr,
blk_idx: usize,
const Self = @This();
pub fn initAndSeekToFirst(allocator: std.mem.Allocator, table: SsTablePtr) !Self {
const s = try seekToFirstInner(allocator, table);
return .{ .allocator = allocator, .table = table, .blk_iterator = s.blk_iter, .blk = s.blk, .blk_idx = 0 };
}
pub fn initAndSeekToKey(allocator: std.mem.Allocator, table: SsTablePtr, k: []const u8) !Self {
const b = try seekToKeyInner(allocator, table, k);
return .{ .allocator = allocator, .table = table, .blk_iterator = b.blk_iter, .blk_idx = b.blk_idx, .blk = b.blk };
}
fn seekToFirstInner(allocator: std.mem.Allocator, table: SsTablePtr) !struct { blk: BlockPtr, blk_iter: BlockIteratorPtr } {
var blk = try table.get().readBlockCached(0, allocator);
errdefer blk.release();
var blk_iter = try BlockIterator.createAndSeekToFirst(allocator, blk.clone());
errdefer blk_iter.deinit();
return .{ .blk = blk, .blk_iter = try BlockIteratorPtr.create(allocator, blk_iter) };
}
fn seekToKeyInner(allocator: std.mem.Allocator, table: SsTablePtr, k: []const u8) !struct { blk_idx: usize, blk: BlockPtr, blk_iter: BlockIteratorPtr } {
const table_ptr = table.get();
var blk_idx = try table_ptr.findBlockIndex(k);
var blk = try table_ptr.readBlockCached(blk_idx, allocator);
errdefer blk.deinit();
var blk_iter = try BlockIterator.createAndSeekToKey(allocator, blk.clone(), k);
errdefer blk_iter.deinit();
var blk_iter_ptr = try BlockIteratorPtr.create(allocator, blk_iter);
errdefer blk_iter_ptr.release();
if (blk_iter.isEmpty()) {
blk_idx += 1;
if (blk_idx < table_ptr.numBlocks()) {
blk.deinit();
blk_iter.deinit();
var blk2 = try table_ptr.readBlockCached(blk_idx, allocator);
errdefer blk2.deinit();
var blk_iter2 = try BlockIterator.createAndSeekToFirst(allocator, blk2.clone());
errdefer blk_iter2.deinit();
return .{ .blk_idx = blk_idx, .blk_iter = try BlockIteratorPtr.create(allocator, blk_iter2), .blk = blk2 };
}
}
return .{ .blk_idx = blk_idx, .blk_iter = blk_iter_ptr, .blk = blk };
}
pub fn key(self: Self) []const u8 { return self.blk_iterator.get().key(); }
pub fn value(self: Self) []const u8 { return self.blk_iterator.get().value(); }
pub fn isEmpty(self: Self) bool { return self.blk_iterator.get().isEmpty(); }
pub fn next(self: *Self) !void {
try self.blk_iterator.get().next();
if (self.blk_iterator.get().isEmpty()) {
self.blk_idx += 1;
if (self.blk_idx < self.table.get().numBlocks()) {
self.reset();
const blk = try self.table.get().readBlockCached(self.blk_idx, self.allocator);
const blk_iter = try BlockIterator.createAndSeekToFirst(self.allocator, blk.clone());
self.blk = blk;
self.blk_iterator = try BlockIteratorPtr.create(self.allocator, blk_iter);
}
}
}
};To merge multiple iterators, a binary‑heap based MergeIterator is used. The heap always yields the iterator with the smallest key (or the newest iterator when keys are equal).
pub fn init(allocator: std.mem.Allocator, iters: std.ArrayList(StorageIteratorPtr)) !Self {
var q = IteratorHeap.init(allocator, .{});
if (iters.items.len == 0) {
return Self{ .allocator = allocator, .q = q, .current = null };
}
for (iters.items, 0..) |sp, i| {
if (!sp.load().isEmpty()) {
const hw = try allocator.create(HeapWrapper);
errdefer allocator.destroy(hw);
hw.* = HeapWrapper.init(i, sp.clone());
try q.add(hw);
}
}
const cc = q.removeOrNull();
return Self{ .allocator = allocator, .q = q, .current = cc };
}
pub fn next(self: *Self) !void {
const cc = self.current.?;
while (true) {
if (self.q.peek()) |ii| {
std.debug.assert(!ii.isEmpty());
if (std.mem.eql(u8, cc.key(), ii.key())) {
try ii.next();
if (ii.isEmpty()) {
_ = self.q.remove();
ii.deinit();
self.allocator.destroy(ii);
}
} else {
break;
}
}
break;
}
try cc.next();
if (cc.isEmpty()) {
defer { cc.deinit(); self.allocator.destroy(cc); }
if (self.q.removeOrNull()) |h| {
self.current = h;
} else {
self.current = null;
}
return;
}
try self.q.add(cc);
self.current = self.q.removeOrNull();
}When only two iterators need to be merged, a lightweight TwoMergeIterator is employed:
pub const TwoMergeIterator = struct {
a: StorageIteratorPtr,
b: StorageIteratorPtr,
choose_a: bool,
fn chooseA(a: *StorageIterator, b: *StorageIterator) bool {
if (a.isEmpty()) return false;
if (b.isEmpty()) return true;
return std.mem.lessThan(u8, a.key(), b.key());
}
fn skipB(self: *TwoMergeIterator) !void {
const ap = self.a.load();
const bp = self.b.load();
if (!ap.isEmpty() and !bp.isEmpty() and std.mem.eql(u8, ap.key(), bp.key())) try bp.next();
}
pub fn init(a: StorageIteratorPtr, b: StorageIteratorPtr) !TwoMergeIterator {
var iter = TwoMergeIterator{ .a = a, .b = b, .choose_a = false };
try iter.skipB();
iter.choose_a = chooseA(iter.a.load(), iter.b.load());
return iter;
}
pub fn key(self: TwoMergeIterator) []const u8 { return if (self.choose_a) self.a.load().key() else self.b.load().key(); }
pub fn value(self: TwoMergeIterator) []const u8 { return if (self.choose_a) self.a.load().value() else self.b.load().value(); }
pub fn isEmpty(self: TwoMergeIterator) bool { return if (self.choose_a) self.a.load().isEmpty() else self.b.load().isEmpty(); }
pub fn next(self: *TwoMergeIterator) !void {
if (self.choose_a) {
try self.a.load().next();
} else {
try self.b.load().next();
}
try self.skipB();
self.choose_a = chooseA(self.a.load(), self.b.load());
}
pub fn deinit(self: *TwoMergeIterator) void { self.a.release(); self.b.release(); }
};7. Compaction
Compaction merges SSTables from a higher level into a lower level when size ratios exceed configured thresholds. The controller generates a compaction task:
pub const SimpleLeveledCompactionController = struct {
options: SimpleLeveledCompactionOptions,
pub fn generateCompactionTask(self: SimpleLeveledCompactionController, state: *storage.StorageState) !?SimpleLeveledCompactionTask {
if (self.options.max_levels == 1) return null;
var level_sizes = std.ArrayList(usize).init(state.allocator);
defer level_sizes.deinit();
try level_sizes.append(state.l0_sstables.items.len);
for (state.levels.items) |level| { try level_sizes.append(level.items.len); }
if (state.l0_sstables.items.len >= self.options.level0_file_num_compaction_trigger) {
return .{ .upper_level = null, .upper_level_sst_ids = try state.l0_sstables.clone(), .lower_level = 1, .lower_level_sst_ids = try state.levels.items[0].clone(), .is_lower_level_bottom = false };
}
for (1..self.options.max_levels) |level| {
const lower_level = level + 1;
if (level_sizes.items[level] == 0) continue;
const size_ratio = level_sizes.items[lower_level] * 100 / level_sizes.items[level];
if (size_ratio < self.options.size_ration_percent) {
return .{ .upper_level = level, .upper_level_sst_ids = try state.levels.items[level - 1].clone(), .lower_level = lower_level, .lower_level_sst_ids = try state.levels.items[lower_level - 1].clone(), .is_lower_level_bottom = lower_level == self.options.max_levels };
}
}
return null;
}
};The actual compaction reads from the merged iterator and writes new SSTables until the target size is reached:
fn compactSimple(self: *Self, task: SimpleLeveledCompactionTask) !std.ArrayList(SsTablePtr) {
if (task.upper_level) |_| {
var upper_ssts = try std.ArrayList(SsTablePtr).initCapacity(self.allocator, task.upper_level_sst_ids.items.len);
var lower_ssts = try std.ArrayList(SsTablePtr).initCapacity(self.allocator, task.lower_level_sst_ids.items.len);
self.state_lock.lockShared();
for (task.upper_level_sst_ids.items) |sst_id| { const sst = self.state.sstables.get(sst_id).?; try upper_ssts.append(sst.clone()); }
for (task.lower_level_sst_ids.items) |sst_id| { const sst = self.state.sstables.get(sst_id).?; try lower_ssts.append(sst.clone()); }
self.state_lock.unlockShared();
var upper_iter = try SstConcatIterator.initAndSeekToFirst(self.allocator, upper_ssts);
errdefer upper_iter.deinit();
var lower_iter = try SstConcatIterator.initAndSeekToFirst(self.allocator, lower_ssts);
errdefer lower_iter.deinit();
var iter = try TwoMergeIterator.init(try StorageIteratorPtr.create(self.allocator, .{ .sst_concat_iter = upper_iter }), try StorageIteratorPtr.create(self.allocator, .{ .sst_concat_iter = lower_iter }));
defer iter.deinit();
return self.compactGenerateSstFromIter(&iter, task.is_lower_level_bottom);
} else {
// L0 -> L1 compaction (logic similar, omitted for brevity)
return self.compactGenerateSstFromIter(&iter, task.is_lower_level_bottom);
}
}
fn compactGenerateSstFromIter(self: *Self, iter: *TwoMergeIterator, compact_to_bottom_level: bool) !std.ArrayList(SsTablePtr) {
var builder = try SsTableBuilder.init(self.allocator, self.options.block_size);
defer builder.deinit();
var new_ssts = std.ArrayList(SsTablePtr).init(self.allocator);
while (!iter.isEmpty()) {
if (compact_to_bottom_level) {
if (iter.value().len > 0) try builder.add(iter.key(), iter.value());
} else {
try builder.add(iter.key(), iter.value());
}
if (builder.estimatedSize() >= self.options.target_sst_size) {
defer builder.reset() catch unreachable;
const sst_id = self.getNextSstId();
const path = try self.pathOfSst(sst_id);
defer self.allocator.free(path);
var sst = try builder.build(sst_id, self.block_cache.clone(), path);
errdefer sst.deinit();
var sst_ptr = try SsTablePtr.create(self.allocator, sst);
errdefer sst_ptr.deinit();
try new_ssts.append(sst_ptr);
}
try iter.next();
}
if (builder.estimatedSize() > 0) {
const sst_id = self.getNextSstId();
const path = try self.pathOfSst(sst_id);
defer self.allocator.free(path);
var sst = try builder.build(sst_id, self.block_cache.clone(), path);
errdefer sst.deinit();
var sst_ptr = try SsTablePtr.create(self.allocator, sst);
errdefer sst_ptr.deinit();
try new_ssts.append(sst_ptr);
}
return new_ssts;
}After generating new SSTables, the system updates the state and removes obsolete files:
pub fn applyCompactionResult(_: SimpleLeveledCompactionController, state: *storage.StorageState, task: SimpleLeveledCompactionTask, output: []usize) !std.ArrayList(usize) {
var files_to_remove = std.ArrayList(usize).init(state.allocator);
errdefer files_to_remove.deinit();
if (task.upper_level) |upper_level| {
std.debug.assert(sliceEquals(task.upper_level_sst_ids.items, state.levels.items[upper_level - 1].items));
try files_to_remove.appendSlice(task.upper_level_sst_ids.items);
state.levels.items[upper_level - 1].clearAndFree();
} else {
try files_to_remove.appendSlice(task.upper_level_sst_ids.items);
var new_l0_sstables = std.ArrayList(usize).init(state.allocator);
errdefer new_l0_sstables.deinit();
var l0_sst_compacted = std.AutoHashMap(usize, struct{}).init(state.allocator);
defer l0_sst_compacted.deinit();
for (task.upper_level_sst_ids.items) |sst_id| { try l0_sst_compacted.put(sst_id, .{}); }
for (state.l0_sstables.items) |sst_id| {
if (!l0_sst_compacted.remove(sst_id)) try new_l0_sstables.append(sst_id);
}
std.debug.assert(l0_sst_compacted.count() == 0);
state.l0_sstables.deinit();
state.l0_sstables = new_l0_sstables;
}
try files_to_remove.appendSlice(task.lower_level_sst_ids.items);
state.levels.items[task.lower_level - 1].clearAndFree();
try state.levels.items[task.lower_level - 1].appendSlice(output);
return files_to_remove;
}
var ssts_to_remove = std.ArrayList(SsTablePtr).init(self.allocator);
{
var new_sst_ids = std.ArrayList(usize).init(self.allocator);
errdefer new_sst_ids.deinit();
self.state_lock.lock();
defer self.state_lock.unlock();
for (sstables.items) |sst| {
const id: usize = @intCast(sst.get().sstId());
try new_sst_ids.append(id);
try self.state.sstables.put(id, sst.clone());
}
var file_to_remove = try self.compaction_controller.applyCompactionResult(&self.state, task, output.items);
defer file_to_remove.deinit();
for (file_to_remove.items) |id| {
if (self.state.sstables.fetchRemove(id)) |kv| {
try ssts_to_remove.append(kv.value);
}
}
try self.syncDir();
}
for (ssts_to_remove.items) |sst| {
const path = try self.pathOfSst(sst.get().sstId());
defer self.allocator.free(path);
try std.fs.cwd().deleteFile(path);
}
try self.syncDir();8. Summary
The article demonstrates a full‑stack LSM‑Tree implementation in Zig, covering in‑memory structures, on‑disk immutable files, write‑ahead logging, iterator composition, and multi‑level compaction. The author highlights key engineering insights such as extensive use of assertions, unit testing, and avoiding I/O inside critical sections.
DeWu Technology
A platform for sharing and discussing tech knowledge, guiding you toward the cloud of technology.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.