Operations 24 min read

Mastering Distributed Log Architecture: From Flume to ELK and Beyond

This comprehensive guide walks you through the challenges of large‑scale log collection, real‑time processing, storage optimization, and visualization, detailing practical configurations for Flume, Logstash, Elasticsearch, Kibana, Filebeat, Kafka, Kubernetes, and future AIOps integrations to build a reliable, cost‑effective distributed logging system.

Ops Community
Ops Community
Ops Community
Mastering Distributed Log Architecture: From Flume to ELK and Beyond

Introduction: Why Your System Needs a Powerful Log Architecture

In distributed systems, a robust logging architecture is essential for rapid fault diagnosis and digital transformation. This article explores the evolution from traditional Flume to modern ELK stacks, providing practical guidance for building an efficient, reliable distributed log system.

Core Challenges of Distributed Log Systems

1.1 Scale Challenges

Log volume can grow from 100 GB/day to over 10 TB/day, making traditional grep/awk insufficient. Key issues include real‑time data collection, storage cost optimization, and query performance.

1.2 Heterogeneous Environments

Java microservices (Spring Boot)

Python data services

Node.js front‑end services

Nginx/Apache access logs

Docker container logs

Kubernetes cluster logs

Unified collection, parsing, and storage across diverse formats is a major challenge.

1.3 Real‑time Monitoring and Alerting

Detect abnormal patterns (e.g., error spikes)

Business metric monitoring (e.g., order success rate)

Intelligent alert deduplication

Apache Flume: Traditional Yet Powerful Choice

2.1 Flume Architecture Deep Dive

Core components include Sources, Channels, and Sinks. Example source configurations:

# Spooling Directory Source - batch file collection
agent.sources.r1.type = spooldir
agent.sources.r1.spoolDir = /var/log/flume-spooling
agent.sources.r1.fileSuffix = .COMPLETED
agent.sources.r1.deletePolicy = immediate
agent.sources.r1.batchSize = 1000

# Taildir Source - real‑time file monitoring (recommended)
agent.sources.r1.type = TAILDIR
agent.sources.r1.filegroups = f1 f2
agent.sources.r1.filegroups.f1 = /var/log/app/*.log
agent.sources.r1.filegroups.f2 = /var/log/nginx/access.log
agent.sources.r1.positionFile = /var/log/flume/taildir_position.json

2.2 Channel Optimization

# Memory Channel - high performance, possible data loss
agent.channels.c1.type = memory
agent.channels.c1.capacity = 100000
agent.channels.c1.transactionCapacity = 10000

# File Channel - reliable, slightly slower
agent.channels.c1.type = file
agent.channels.c1.checkpointDir = /data/flume/checkpoint
agent.channels.c1.dataDirs = /data/flume/data
agent.channels.c1.capacity = 1000000
agent.channels.c1.transactionCapacity = 10000

# Kafka Channel - balanced performance and reliability (recommended)
agent.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
agent.channels.c1.kafka.bootstrap.servers = kafka1:9092,kafka2:9092
agent.channels.c1.kafka.topic = flume-channel
agent.channels.c1.kafka.consumer.group.id = flume-consumer

2.3 Flume Optimization Techniques

Batch size tuning increased throughput threefold:

# Before optimization: batchSize=100, throughput≈5,000 events/s
agent.sinks.k1.hdfs.batchSize = 100

# After optimization: batchSize=1000, throughput≈15,000 events/s
agent.sinks.k1.hdfs.batchSize = 1000
agent.sinks.k1.hdfs.rollInterval = 30
agent.sinks.k1.hdfs.rollSize = 134217728
agent.sinks.k1.hdfs.rollCount = 0

Custom interceptor example adds timestamps, parses log levels, and enriches events with host information.

public class LogEnhancerInterceptor implements Interceptor {
    public Event intercept(Event event) {
        event.getHeaders().put("timestamp", String.valueOf(System.currentTimeMillis()));
        String body = new String(event.getBody());
        if (body.contains("ERROR")) {
            event.getHeaders().put("level", "ERROR");
            event.getHeaders().put("alert", "true");
        }
        event.getHeaders().put("hostname", InetAddress.getLocalHost().getHostName());
        return event;
    }
}

ELK Stack: Modern Log System Standard

3.1 Elasticsearch Index Design Best Practices

For daily 10 TB log volumes, an index template with appropriate shards, replicas, and lifecycle policies is crucial.

PUT _index_template/logs-template
{
  "index_patterns": ["logs-*"],
  "template": {
    "settings": {
      "number_of_shards": 10,
      "number_of_replicas": 1,
      "refresh_interval": "30s",
      "index.translog.durability": "async",
      "index.translog.sync_interval": "30s",
      "index.lifecycle.name": "logs-policy",
      "index.lifecycle.rollover_alias": "logs-write"
    },
    "mappings": {
      "properties": {
        "@timestamp": {"type": "date"},
        "message": {"type": "text", "fields": {"keyword": {"type": "keyword", "ignore_above": 256}}},
        "level": {"type": "keyword"},
        "host": {"type": "keyword"},
        "service": {"type": "keyword"},
        "trace_id": {"type": "keyword"}
      }
    }
  }
}

3.2 Logstash: Powerful Data Processing Pipeline

Example configuration parses Nginx access logs, enriches with GeoIP and user‑agent data, filters errors, and outputs to Elasticsearch with performance‑tuned pipeline settings.

input {
  kafka {
    bootstrap_servers => "kafka1:9092,kafka2:9092,kafka3:9092"
    topics => ["nginx-logs"]
    group_id => "logstash-consumer"
    codec => "json"
    consumer_threads => 4
  }
}
filter {
  grok { match => {"message" => '%{IPORHOST:client_ip} - %{USER:ident} \[%{HTTPDATE:timestamp}\] "%{WORD:method} %{URIPATHPARAM:request} HTTP/%{NUMBER:http_version}" %{NUMBER:status} %{NUMBER:bytes} "%{DATA:referrer}" "%{DATA:user_agent}" %{NUMBER:request_time}' } }
  date { match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"] target => "@timestamp" }
  geoip { source => "client_ip" target => "geoip" }
  useragent { source => "user_agent" target => "ua" }
  mutate { add_field => {"environment" => "production", "datacenter" => "dc1"} convert => {"status" => "integer", "bytes" => "integer", "request_time" => "float"} }
  if [status] >= 400 { mutate { add_tag => ["error"] add_field => {"alert_level" => "high"} } }
  if [request_time] > 3 { mutate { add_tag => ["slow_request"] } }
}
output {
  elasticsearch { hosts => ["es1:9200","es2:9200","es3:9200"] index => "nginx-logs-%{+YYYY.MM.dd}" pipeline => "nginx-pipeline" }
  http { url => "http://alert-system/api/alert" http_method => "post" format => "json" }
}

3.3 Kibana: Data Visualization Art

Design dashboards for real‑time error rate monitoring, service call chain tracing, and intelligent alert rules using Elasticsearch queries and visualizations.

{
  "query": {"bool": {"must": [{"range": {"@timestamp": {"gte": "now-5m"}}}, {"term": {"level": "ERROR"}}]}},
  "aggs": {"error_rate": {"date_histogram": {"field": "@timestamp", "interval": "30s"}}}
}

Filebeat + Kafka: Lightweight Collection Solution

4.1 Why Choose Filebeat

Low memory usage (<50 MB)

Low CPU usage (<5 %)

Native container support

Rich built‑in modules

4.2 Filebeat Advanced Configuration

filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /var/log/app/*.log
  multiline.pattern: '^[0-9]{4}-[0-9]{2}-[0-9]{2}'
  multiline.negate: true
  multiline.match: after
  fields:
    service: user-service
    env: production
  fields_under_root: true
  harvester_buffer_size: 32768
  max_bytes: 10485760
  close_inactive: 5m
  clean_inactive: 72h
  scan_frequency: 10s
  processors:
    - add_host_metadata: {}
    - add_docker_metadata: {host: "unix:///var/run/docker.sock"}
    - add_kubernetes_metadata: {in_cluster: true}
    - drop_event:
        when:
          regexp:
            message: ".*health.*check.*"
    - script:
        lang: javascript
        id: my_filter
        source: |
          function process(event) {
            var message = event.Get("message");
            if (message.indexOf("ERROR") !== -1) { event.Put("alert", true); }
          }
output.kafka:
  hosts: ["kafka1:9092","kafka2:9092","kafka3:9092"]
  topic: 'logs-%{[fields.service]}'
  required_acks: 1
  compression: gzip
  max_message_bytes: 1000000
  bulk_max_size: 2048
  retry.max: 3
  retry.backoff: 250ms
  monitoring.enabled: true
  monitoring.elasticsearch:
    hosts: ["es-monitor:9200"]

Cloud‑Native Logging in Kubernetes

Deploy Fluent Bit as a DaemonSet to collect container logs, enrich with Kubernetes metadata, and ship to Elasticsearch.

apiVersion: apps/v1
kind: DaemonSet
metadata:
  name: fluent-bit
  namespace: logging
spec:
  selector:
    matchLabels:
      app: fluent-bit
  template:
    metadata:
      labels:
        app: fluent-bit
    spec:
      serviceAccountName: fluent-bit
      containers:
        - name: fluent-bit
          image: fluent/fluent-bit:1.9
          resources:
            limits:
              memory: 200Mi
            requests:
              cpu: 100m
              memory: 100Mi
          volumeMounts:
            - name: varlog
              mountPath: /var/log
            - name: varlibdockercontainers
              mountPath: /var/lib/docker/containers
              readOnly: true
            - name: config
              mountPath: /fluent-bit/etc/
      volumes:
        - name: varlog
          hostPath:
            path: /var/log
        - name: varlibdockercontainers
          hostPath:
            path: /var/lib/docker/containers
        - name: config
          configMap:
            name: fluent-bit-config
---
apiVersion: v1
kind: ConfigMap
metadata:
  name: fluent-bit-config
  namespace: logging
data:
  fluent-bit.conf: |
    [SERVICE]
        Flush        5
        Daemon       Off
        Log_Level    info
        Parsers_File parsers.conf
    [INPUT]
        Name              tail
        Path              /var/log/containers/*.log
        Parser            docker
        Tag               kube.*
        Refresh_Interval 5
        Mem_Buf_Limit    50MB
        Skip_Long_Lines   On
    [FILTER]
        Name                kubernetes
        Match               kube.*
        Kube_URL            https://kubernetes.default.svc:443
        Kube_CA_File        /var/run/secrets/kubernetes.io/serviceaccount/ca.crt
        Kube_Token_File     /var/run/secrets/kubernetes.io/serviceaccount/token
        Merge_Log           On
        K8S-Logging.Parser On
        K8S-Logging.Exclude On
    [OUTPUT]
        Name  es
        Match *
        Host  elasticsearch.logging.svc.cluster.local
        Port  9200
        Index k8s-logs
        Type  _doc
        Buffer_Size 512KB
        Retry_Limit 5

Performance Tuning and Troubleshooting

7.1 Common Performance Issues

Issue 1: Elasticsearch write slowdown – Symptoms: increased log latency, Kafka backlog. Solution: increase refresh interval, enlarge bulk size, add shards, use bulk API.

# Increase refresh interval
PUT /logs-*/_settings
{
  "index.refresh_interval": "30s"
}
# Adjust bulk write size via client configuration

Issue 2: Slow query response – Optimize by using filter clauses, caching, and aggregations.

{
  "query": {
    "bool": {
      "filter": [
        {"term": {"status": "ERROR"}},
        {"range": {"@timestamp": {"gte": "now-1h"}}}
      ]
    }
  },
  "aggs": {
    "error_services": {
      "terms": {"field": "service.keyword", "size": 10, "execution_hint": "map"}
    }
  }
}

7.2 Failure Recovery Process

When ES cluster turns red, quickly locate unassigned shards, adjust replica count, and expand storage. Example emergency command:

PUT _cluster/settings
{
  "transient": {
    "cluster.routing.allocation.enable": "all",
    "cluster.routing.allocation.node_concurrent_recoveries": 10,
    "indices.recovery.max_bytes_per_sec": "200mb"
  }
}

Security and Compliance

8.1 Log Redaction

# Logstash redaction filter
filter {
  mutate { gsub => ["message", "\b\d{15}(\d{3})\b", "***************\\1"] }
  mutate { gsub => ["message", "\b\d{6}(\d{8})\d{4}\b", "******\\1****"] }
  mutate { gsub => ["message", "\b1[3-9]\d{5}(\d{4})\b", "1XX****\\1"] }
  ruby { code => "event.set('message', event.get('message').gsub(/\b([0-9]{4})[0-9]{8,11}([0-9]{4})\b/, '\\1********\\2'))" }
}

8.2 Access Control and Auditing

# Elasticsearch security settings
xpack.security.enabled: true
xpack.security.transport.ssl.enabled: true
xpack.security.audit.enabled: true
xpack.security.audit.outputs: [index, logfile]

# Role definition allowing read‑only access to non‑sensitive logs
PUT _security/role/logs_reader
{
  "indices": [{
    "names": ["logs-*"],
    "privileges": ["read", "view_index_metadata"],
    "query": {"bool": {"must_not": {"term": {"sensitive": true}}}}
  }]
}

Future Outlook: AIOps and Intelligent Log Analysis

9.1 Anomaly Detection Integration

from elasticsearch import Elasticsearch
from sklearn.ensemble import IsolationForest
import numpy as np

class LogAnomalyDetector:
    def __init__(self):
        self.es = Elasticsearch(['es:9200'])
        self.model = IsolationForest(contamination=0.1)
    def train(self):
        query = {"size": 10000, "query": {"range": {"@timestamp": {"gte": "now-7d"}}}}
        resp = self.es.search(index='logs-*', body=query)
        # Extract features (e.g., hourly error counts) and fit model
        # ...
    def detect(self, current_data):
        if self.model.predict(current_data) == -1:
            self.send_alert('Anomaly detected in log patterns')

9.2 Intelligent Log Clustering

from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.cluster import DBSCAN

def cluster_error_logs(logs):
    vectorizer = TfidfVectorizer(max_features=100)
    X = vectorizer.fit_transform(logs)
    clustering = DBSCAN(eps=0.3, min_samples=5)
    clusters = clustering.fit_predict(X)
    representative = {}
    for cid in set(clusters):
        if cid != -1:
            cluster_logs = [logs[i] for i, c in enumerate(clusters) if c == cid]
            representative[cid] = cluster_logs[0]
    return representative

Conclusion: Core Takeaways for Building an Efficient Log System

Layered Architecture Design : Decouple collection, transport, storage, and analysis for independent scaling.

Performance‑First Mindset : Optimize at design time rather than retrofitting.

Cost Awareness : Use lifecycle management to reduce storage expenses.

Automated Operations : Implement monitoring, alerting, and automated recovery.

Security and Compliance : Apply log redaction and strict access controls from the source.

Continuous Optimization : Evolve the architecture as business needs grow.

There is no perfect logging system; the goal is to craft a solution that best fits your specific workload and operational requirements.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

monitoringObservabilityKafkaELKFlumelog aggregation
Ops Community
Written by

Ops Community

A leading IT operations community where professionals share and grow together.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.