Backend Log Management Threads, Log Cleaning, and Compaction in Distributed Kafka Systems
This article explains how Kafka's LogManager loads existing logs, manages background threads for flushing, checkpointing, cleaning, and compaction, and details the code implementations and strategies for log retention, segment cleanup, and log compression in a distributed storage environment.
In distributed storage systems like Kafka, the LogManager must load all existing log files during startup, similar to how Log loads its Segments, using the logDirs configuration that maps to log.dirs. Each TopicPartition directory corresponds to a Log instance, and only after all Log instances are loaded can the LogManager operate.
Assuming /tmp/kafka_logs1,/tmp/kafka_logs2 as logDirs, the article illustrates the directory layout and the relationship between Log objects and their directories.
The Scala class LogManager creates a pool of logs and recovery point checkpoints for each directory, then loads logs in parallel using thread pools:
class LogManager(val logDirs: Array[File]){
val logs = new Pool[TopicAndPartition, Log]()
val recoveryPointCheckpoints = logDirs.map((_, new OffsetCheckpoint(new File(_, "checkpoint"))))
private def loadLogs(): Unit = {
val threadPools = mutable.ArrayBuffer.empty[ExecutorService]
for (dir <- this.logDirs) {
val pool = Executors.newFixedThreadPool(ioThreads)
threadPools.append(pool)
var recoveryPoints: Map[TopicAndPartition, Long] = recoveryPointCheckpoints(dir).read
val jobsForDir = for {
dirContent <- Option(dir.listFiles).toList
logDir <- dirContent if logDir.isDirectory
} yield {
CoreUtils.runnable {
val topicPartition = Log.parseTopicPartitionName(logDir)
val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig)
val logRecoveryPoint = recoveryPoints.getOrElse(topicPartition, 0L)
val current = new Log(logDir, config, logRecoveryPoint, scheduler, time)
this.logs.put(topicPartition, current)
}
}
jobsForDir.map(pool.submit).toSeq
}
}
def allLogs(): Iterable[Log] = logs.values
def logsByDir = logs.groupBy{ case (_, log) => log.dir.getParent }
val cleaner: LogCleaner = new LogCleaner(cleanerConfig, logDirs, logs)
def startup() {
scheduler.schedule("log-retention", cleanupLogs)
scheduler.schedule("log-flusher", flushDirtyLogs)
scheduler.schedule("recovery-point-checkpoint", checkpointRecoveryPointOffsets)
if (cleanerConfig.enableCleaner) cleaner.startup()
}
}After loadLogs runs, the LogManager starts several scheduled background tasks (shown in Table 3‑7) that operate on the loaded logs, such as log retention, flushing, and checkpointing.
Flushing writes the latest data and checkpoint offsets to disk; each log.dirs directory contains a single global checkpoint file shared by all partitions.
private def flushDirtyLogs() = {
for ((topicAndPartition, log) <- logs) {
val timeSinceLastFlush = time.milliseconds - log.lastFlushTime
if (timeSinceLastFlush >= log.config.flushMs) log.flush
}
}
def checkpointRecoveryPointOffsets() {
this.logDirs.foreach(checkpointLogsInDir)
}
private def checkpointLogsInDir(dir: File): Unit = {
val recoveryPoints = this.logsByDir.get(dir.toString)
recoveryPointCheckpoints(dir).write(recoveryPoints.get.mapValues(_.recoveryPoint))
}Log cleaning removes expired or oversized Segments. The article provides Scala implementations for cleaning tasks, segment deletion, and asynchronous file removal.
def cleanupLogs() {
for (log <- allLogs if !log.config.compact)
cleanupExpiredSegments(log) + cleanupSegmentsToMaintainSize(log)
}
private def cleanupExpiredSegments(log: Log): Int = {
log.deleteOldSegments(time.milliseconds - _.lastModified > log.config.retentionMs)
}
private def cleanupSegmentsToMaintainSize(log: Log): Int = {
var diff = log.size - log.config.retentionSize
def shouldDelete(segment: LogSegment) = {
if (diff - segment.size >= 0) { diff -= segment.size; true } else false
}
log.deleteOldSegments(shouldDelete)
}Log compaction ensures that for each key only the latest value remains. The process works on whole Segments, preserving the active Segment while compressing older ones, and can be throttled to limit CPU usage.
Selection of partitions for compaction is based on the cleanableRatio, which compares dirty bytes (Tail) to total bytes (Tail + Clean). The article shows the Scala case class used to compute this ratio.
case class LogToClean(topicPartition: TopicAndPartition, log: Log, firstDirtyOffset: Long) extends Ordered[LogToClean] {
val cleanBytes = log.logSegments(-1, firstDirtyOffset).map(_.size).sum
val dirtyBytes = log.logSegments(firstDirtyOffset, math.max(firstDirtyOffset, log.activeSegment.baseOffset)).map(_.size).sum
val cleanableRatio = dirtyBytes / totalBytes.toDouble
def totalBytes = cleanBytes + dirtyBytes
override def compare(th: LogToClean) = math.signum(this.cleanableRatio - th.cleanableRatio)
}Deletion operations also generate tombstone records; compaction removes records preceding a tombstone while preserving the most recent state. The article concludes with the guarantees provided by Kafka's log compaction regarding ordering, offset immutability, and consumer visibility.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Architect
Professional architect sharing high‑quality architecture insights. Topics include high‑availability, high‑performance, high‑stability architectures, big data, machine learning, Java, system and distributed architecture, AI, and practical large‑scale architecture case studies. Open to ideas‑driven architects who enjoy sharing and learning.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
