Cloud Native 20 min read

How to Build a Scalable Kubernetes Logging Pipeline with EFK and Fluentd

This article explains how to collect, process, and visualize Flink job logs on Kubernetes using an EFK stack with Fluentd, covering logging architectures, deployment of Elasticsearch, Kibana, and Fluentd, and the backend logic for querying and displaying logs in a feature platform.

Huolala Tech
Huolala Tech
Huolala Tech
How to Build a Scalable Kubernetes Logging Pipeline with EFK and Fluentd

Background

The feature platform is designed for data scientists, data engineers, and machine learning engineers to build AI feature stores for model training, testing, and other data usage, solving problems such as scattered data storage, feature duplication, complex extraction, and difficult usage. It pulls data from HBase, Hive, relational databases, and other ODS layers into a unified feature store. The core uses Flink for ETL, deployed on a Kubernetes cluster, which raises the need to collect and present Flink job logs (driver, JobManager, TaskManager). Kubernetes does not provide a Flink‑on‑YARN‑like log access method, so a solution for log collection, storage, extraction, and display is required.

This article shares common Kubernetes logging solutions and the end‑to‑end EFK‑based log pipeline for PyFlink on Kubernetes.

Kubernetes Logging Solutions

Containers write logs to stdout and stderr, which Kubernetes stores as JSON files on the host. Logs are typically viewed with kubectl logs or docker logs. Kubernetes offers cluster‑level logging to retain logs after pod deletion, node failure, etc. Three typical approaches are recommended:

Deploy a DaemonSet with a logging agent on each node.

Include a sidecar container in the pod to collect application logs.

Push logs directly from the application to a backend storage.

1. DaemonSet Logging Agent

Install a logging agent (commonly Fluentd + Elasticsearch) on every node, mounting the host directory /var/log/containers to forward logs to a backend. This method is non‑intrusive to the application but requires logs to be written to stdout / stderr.

2. Sidecar Container

When applications write logs to files instead of stdout / stderr, a sidecar container can mount the log directory, read the files, and re‑emit the logs to stdout / stderr for collection by the DaemonSet method. This introduces log duplication and increased storage usage.

3. Direct Log Push

Applications can push logs directly to a remote store, but this prevents using kubectl logs and adds resource overhead for the sidecar logging agent.

The first method is officially recommended because it is simple to manage, works with kubectl logs and docker logs, and has minimal impact on application performance. The feature platform adopts this method: Fluentd collects container logs, enriches them with Kubernetes metadata, and forwards them to Elasticsearch.

Fluentd System

Fluentd is an open‑source data collector that handles log ingestion, processing, and forwarding in a single system. Its rich plugin ecosystem allows it to collect logs from various sources, transform them into user‑defined formats, and send them to chosen storage backends, freeing users from complex log‑processing pipelines.

The basic workflow is:

Fluentd gathers data from multiple log sources.

It tags and structures the data according to configured filters.

It routes the data to one or more destination services.

Fluentd is written in Ruby, is highly extensible, and consumes relatively few resources, making it suitable for most enterprise environments.

In a Kubernetes cluster, Fluentd runs on each node, reads container log files, applies filters and transformations, and forwards the enriched logs to an Elasticsearch cluster. Applications can then query Elasticsearch directly or visualize logs with Kibana.

EFK Log System Implementation

Overall Business Flow

Flink tasks run in a Kubernetes cluster, generating logs that are stored on each node under /var/log/containers. A DaemonSet runs Fluentd on every node, mounts the log directory, processes the logs, and pushes them to Elasticsearch. The stored logs are queried by the feature platform backend and visualized in Kibana.

Elasticsearch Deployment on Kubernetes

Elasticsearch can be deployed using the official ECK operator or manually via YAML manifests. Below are example RBAC rules and a StatefulSet definition for a two‑node Elasticsearch cluster.

# RBAC authn and authz
apiVersion: v1
kind: ServiceAccount
metadata:
  name: elasticsearch-logging
  namespace: kube-system
---
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  name: elasticsearch-logging
subjects:
- kind: ServiceAccount
  name: elasticsearch-logging
  namespace: kube-system
roleRef:
  kind: ClusterRole
  name: admin
  apiGroup: ""
# Elasticsearch StatefulSet
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: elasticsearch-logging
  namespace: kube-system
spec:
  serviceName: elasticsearch-logging
  replicas: 2
  selector:
    matchLabels:
      k8s-app: elasticsearch-logging
  template:
    metadata:
      labels:
        k8s-app: elasticsearch-logging
    spec:
      serviceAccountName: elasticsearch-logging
      containers:
      - name: elasticsearch-logging
        image: quay.io/fluentd_elasticsearch/elasticsearch:v7.4.2
        ports:
        - containerPort: 9200
          name: db
        - containerPort: 9300
          name: transport
        resources:
          limits:
            cpu: 1000m
            memory: 3Gi
          requests:
            cpu: 100m
            memory: 3Gi
        volumeMounts:
        - name: elasticsearch-logging
          mountPath: /data
        env:
        - name: NAMESPACE
          valueFrom:
            fieldRef:
              fieldPath: metadata.namespace
        - name: MINIMUM_MASTER_NODES
          value: "1"
        volumes:
        - name: elasticsearch-logging
          emptyDir: {}
        initContainers:
        - name: elasticsearch-logging-init
          image: alpine:3.6
          command: ["/sbin/sysctl", "-w", "vm.max_map_count=262144"]
          securityContext:
            privileged: true

To expose the cluster externally, a NodePort Service is created:

apiVersion: v1
kind: Service
metadata:
  name: elasticsearch-logging
  namespace: kube-system
spec:
  type: NodePort
  ports:
  - name: db
    port: 9200
    targetPort: 9200
  - name: transport
    port: 9300
    targetPort: 9300
  selector:
    k8s-app: elasticsearch-logging

Kibana Deployment on Kubernetes

Kibana, part of the Elastic Stack, provides visualization and analysis of data stored in Elasticsearch. Deploy it with a standard Deployment manifest:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: kibana-logging
  namespace: kube-system
spec:
  replicas: 1
  selector:
    matchLabels:
      k8s-app: kibana-logging
  template:
    metadata:
      labels:
        k8s-app: kibana-logging
    spec:
      containers:
      - name: kibana-logging
        image: docker.elastic.co/kibana/kibana-oss:7.4.2
        env:
        - name: ELASTICSEARCH_HOSTS
          value: http://elasticsearch-logging:9200
        - name: SERVER_NAME
          value: kibana-logging
        - name: SERVER_REWRITEBASEPATH
          value: "false"
        ports:
        - containerPort: 5601
          name: ui
        livenessProbe:
          httpGet:
            path: /api/status
            port: ui
          initialDelaySeconds: 5
          timeoutSeconds: 10
        readinessProbe:
          httpGet:
            path: /api/status
            port: ui
          initialDelaySeconds: 5
          timeoutSeconds: 10

Fluentd Deployment on Kubernetes

Create a ConfigMap that contains Fluentd configuration files, then deploy a DaemonSet with the necessary RBAC permissions.

apiVersion: v1
kind: ConfigMap
metadata:
  name: fluentd-es-config-v0.2.0
  namespace: kube-system
data:
  containers.input.conf: |-
    <source>
      @type tail
      path /var/log/containers/feature*.log
      pos_file /var/log/es-containers.log.pos
      time_format %Y-%m-%dT%H:%M:%S.%NZ
      tag "kubernetes.*"
      format json
      read_from_head true
    </source>
    <source>
      @type tail
      path /var/log/containers/zeppelin-server-*.log
      pos_file /var/log/zepplin-containers.log.pos
      time_format %Y-%m-%dT%H:%M:%S.%NZ
      tag zepplinlog
      format json
      read_from_head true
    </source>
    <filter kubernetes.**>
      @type record_transformer
      @id filter_rt_kube_logs
      enable_ruby true
      <record>
        hostname ${ENV['FLUENT_HOSTNAME']}-${record["kubernetes"]["namespace_name"]}-${record["kubernetes"]["pod_name"]}
        program ${record["kubernetes"]["container_name"]}
        severity info
        facility local0
        message ${record['log']}
      </record>
    </filter>
    <filter kubernetes.**>
      @type record_transformer
      remove_keys $.docker.container_id,$.kubernetes.container_image_id,$.kubernetes.pod_id,$.kubernetes.namespace_id,$.kubernetes.master_url,$.kubernetes.labels.pod-template-hash
    </filter>
    <match zepplinlog>
      @id zepplin_es
      @type elasticsearch
      @log_level info
      type_name _doc
      include_tag_key true
      host elasticsearch-logging
      port 9200
      logstash_format true
      logstash_prefix zepplin_
      <buffer>
        @type file
        path /var/log/fluentd-buffers/kubernetes.system.buffer.zepplin
        flush_mode interval
        retry_type exponential_backoff
        flush_thread_count 2
        flush_interval 10s
        retry_forever
        retry_max_interval 30
        chunk_limit_size 2M
        total_limit_size 500M
        overflow_action block
      </buffer>
    </match>
    <match **>
      @id elasticsearch
      @type elasticsearch
      @log_level debug
      type_name _doc
      include_tag_key true
      host elasticsearch-logging
      port 9200
      logstash_format true
      logstash_prefix feature_
      <buffer>
        @type file
        path /var/log/fluentd-buffers/kubernetes.system.buffer
        flush_mode interval
        retry_type exponential_backoff
        flush_thread_count 2
        flush_interval 10s
        retry_forever
        retry_max_interval 30
        chunk_limit_size 2M
        total_limit_size 500M
        overflow_action block
      </buffer>
    </match>
apiVersion: v1
kind: ServiceAccount
metadata:
  name: fluentd-es
  namespace: kube-system
---
kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  name: fluentd-es
rules:
- apiGroups: [""]
  resources: ["namespaces", "pods"]
  verbs: ["get", "watch", "list"]
---
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  name: fluentd-es
subjects:
- kind: ServiceAccount
  name: fluentd-es
  namespace: kube-system
roleRef:
  kind: ClusterRole
  name: fluentd-es
  apiGroup: ""
---
apiVersion: apps/v1
kind: DaemonSet
metadata:
  name: fluentd-es-v3.0.5
  namespace: kube-system
spec:
  selector:
    matchLabels:
      k8s-app: fluentd-es
  template:
    metadata:
      labels:
        k8s-app: fluentd-es
    spec:
      serviceAccountName: fluentd-es
      containers:
      - name: fluentd-es
        image: quay.io/fluentd_elasticsearch/fluentd:v3.0.5
        env:
        - name: FLUENTD_ARGS
          value: --no-supervisor -q
        resources:
          limits:
            memory: 500Mi
          requests:
            cpu: 100m
            memory: 200Mi
        volumeMounts:
        - name: varlog
          mountPath: /var/log
        - name: varlibdockercontainers
          mountPath: /var/lib/docker/containers
          readOnly: true
        - name: config-volume
          mountPath: /etc/fluent/config.d
        - name: home-container-volume
          mountPath: /home/data/docker/containers
        ports:
        - containerPort: 24231
          name: prometheus
          protocol: TCP
      terminationGracePeriodSeconds: 30
      volumes:
      - name: varlog
        hostPath:
          path: /var/log
      - name: varlibdockercontainers
        hostPath:
          path: /var/lib/docker/containers
      - name: config-volume
        configMap:
          name: fluentd-es-config-v0.2.0
      - name: home-container-volume
        hostPath:
          path: /home/data/docker/containers

Deploy the configuration with kubectl apply -f fluentd-configmap.yaml and kubectl apply -f fluentd-daemonset.yaml. After deployment, Fluentd reads the specified log files, enriches them with Kubernetes metadata, and forwards them to the Elasticsearch cluster.

Log Presentation Logic

The backend constructs query conditions to retrieve Flink driver, JobManager, and TaskManager logs from Elasticsearch based on index prefixes and pod name fields, concatenates the log entries into a single string, and returns it to the frontend UI for display. This enables detailed log inspection for Flink job troubleshooting. The key configuration is the Fluentd Kubernetes filter, which adds metadata such as namespace, pod name, image name, and labels to each log record.

Conclusion

The article presented an EFK‑based solution for collecting and displaying Flink task logs on Kubernetes, covering Kubernetes logging mechanisms, Flink on Kubernetes implementation, Elasticsearch cluster setup, Fluentd architecture and configuration, and log presentation logic. Future work includes log analysis, alerting, statistical reporting, and further platform enhancements.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

FlinkElasticsearchKubernetesloggingKibanaFluentdEFK
Huolala Tech
Written by

Huolala Tech

Technology reshapes logistics

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.