Master Real‑Time Kubernetes Logs with Fabric8 Client: Stream, Filter, and De‑Duplicate
This article introduces Fabric8 Kubernetes Client's log APIs, demonstrates how to stream pod logs with watchLog, explains periodic log pulling with sinceSeconds, compares three deduplication strategies, and provides Java implementations using LinkedHashMap and LRU caches to reliably process Kubernetes logs.
Fabric8 Kubernetes Client Log API Overview
getLog() – one‑time retrieval of a pod’s log (suitable for short‑lived or static logs).
inContainer("name").getLog() – fetch logs from a specific container inside a multi‑container pod.
watchLog() – returns a LogWatch whose getOutput() provides a live InputStream for real‑time log streaming.
watchLog(OutputStream) – stream logs directly to an OutputStream (e.g., System.out or a file).
tailingLines(n).getLog() – retrieve only the latest n lines.
sinceSeconds(n).getLog() – fetch logs generated in the last n seconds.
withTimestamps().getLog() – include precise timestamps in each log line.
previousLog() – obtain logs from a terminated or restarted container.
withLabel("key=value").list() – collect logs from all pods that share a given label.
Preferred Real‑Time Log Streaming
Streaming via watchLog() delivers logs instantly without post‑processing. The typical pattern is to create a KubernetesClient, locate the target pod, call watchLog(), and consume each line from logWatch.getOutput():
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.dsl.LogWatch;
public class LogStreamer {
public static void main(String[] args) {
try (KubernetesClient client = new DefaultKubernetesClient()) {
String podName = client.pods()
.inNamespace("test")
.list()
.getItems()
.stream()
.filter(p -> p.getMetadata().getName().contains("FunTester-pod"))
.findFirst()
.orElseThrow(() -> new IllegalArgumentException("Pod not found"))
.getMetadata().getName();
LogWatch watch = client.pods()
.inNamespace("test")
.withName(podName)
.watchLog();
watch.getOutput().eachLine(line -> System.out.println("Pod Log: " + line));
} catch (Exception e) {
e.printStackTrace();
}
}
}In practice the stream may stop after a few minutes due to server‑side limits; a reconnection strategy is required for long‑running monitoring.
Periodic Pull Approach
When streaming is not feasible, logs can be polled periodically using sinceSeconds(). A naive schedule (e.g., pull every 10 s for the last 10 s) can lose entries, while pulling a larger window (e.g., 11 s) introduces duplicates.
Deduplication Strategies
Deduplicate by unique log ID (e.g., requestId, traceId) using a Set. Requires periodic cleanup to bound memory usage.
Deduplicate by time window: keep the maximum timestamp processed and ignore any log with a timestamp ≤ that value. Works well when timestamps are strictly increasing.
Deduplicate with an LRU cache: store recent log IDs in a fixed‑size cache (e.g., LinkedHashMap with removeEldestEntry) to bound memory while still eliminating duplicates.
The author selected the time‑window method because the Fabric8 client can return millisecond‑precision timestamps, enabling loss‑less filtering.
Implementation of Time‑Window Deduplication
static void handleLogs(String namespace, String podName, java.util.function.Consumer<String> consumer) {
final long[] lastTimestamp = {0L};
java.util.concurrent.Executors.newSingleThreadScheduledExecutor()
.scheduleAtFixedRate(() -> {
try (var reader = K8sService.client.pods()
.inNamespace(namespace)
.withName(podName)
.sinceSeconds(11) // slightly larger window to avoid gaps
.getLogReader()) {
var lines = reader.readLines();
for (String line : lines) {
long ts = extractTimestamp(line);
if (ts > lastTimestamp[0]) {
consumer.accept(line);
lastTimestamp[0] = ts;
}
}
} catch (Exception e) {
log.error("Log pull error", e);
}
}, 0, 10, java.util.concurrent.TimeUnit.SECONDS);
}
private static long extractTimestamp(String logLine) {
// Assuming the log line starts with an ISO‑8601 timestamp or epoch ms.
// Implement parsing logic appropriate to the log format.
return System.currentTimeMillis(); // placeholder
}LRU Cache with LinkedHashMap
Java’s LinkedHashMap provides a protected removeEldestEntry() hook that can be overridden to create an LRU cache. The following class stores processed log IDs and automatically evicts the oldest entry when the cache exceeds a configurable size:
import java.util.LinkedHashMap;
import java.util.Map;
public class IdCache extends LinkedHashMap<String, Long> {
private final int maxEntries;
public IdCache(int maxEntries) {
super(maxEntries, 0.75f, false); // insertion order
this.maxEntries = maxEntries;
}
@Override
protected boolean removeEldestEntry(Map.Entry<String, Long> eldest) {
return size() > maxEntries;
}
}When combined with timestamp filtering, this cache can guard against duplicate processing while keeping memory usage bounded.
Access‑Order LRU Cache with Expiration
For scenarios where entries should expire after a time interval (e.g., 10 minutes), enable access‑order and embed a timestamp in the value. The removeEldestEntry method checks the age of the eldest entry and removes it if it exceeds the configured expiration time.
import java.util.LinkedHashMap;
import java.util.Map;
public class AccessTimeCache<K, V extends AccessTimeCache.CacheItem> extends LinkedHashMap<K, V> {
private final long expirationMs;
public AccessTimeCache(int capacity, long expirationMs) {
super(capacity, 0.75f, true); // accessOrder = true
this.expirationMs = expirationMs;
}
@Override
protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
long now = System.currentTimeMillis();
return (now - eldest.getValue().timestamp) > expirationMs;
}
public static class CacheItem {
public final String data;
public final long timestamp;
public CacheItem(String data) {
this.data = data;
this.timestamp = System.currentTimeMillis();
}
}
}Example: Streaming with Tail and Automatic Cleanup
static void handlePodLogFlow(String namespace, String podName, java.util.function.Consumer<String> consumer) {
try (LogWatch watch = K8sService.client.pods()
.inNamespace(namespace)
.withName(podName)
.tailingLines(200)
.watchLog()) {
watch.getOutput().eachLine(consumer);
} catch (Exception e) {
log.error("handlePodLogFlow error", e);
}
}Summary
The Fabric8 Kubernetes client offers both one‑shot log retrieval and continuous streaming. For real‑time monitoring, watchLog() is preferred, but developers must handle possible stream termination. When streaming is unsuitable, periodic polling with sinceSeconds() can be used together with a time‑window deduplication strategy. To avoid unbounded memory usage, an LRU cache based on LinkedHashMap (insertion order or access order) can be employed, optionally with timestamp‑based expiration. The code snippets above illustrate practical implementations for both streaming and pull‑based log collection, as well as reusable cache utilities.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
