How to Build a Scalable Kubernetes Logging Pipeline with EFK and Fluentd
This article explains how to collect, process, and visualize Flink job logs on Kubernetes using an EFK stack with Fluentd, covering logging architectures, deployment of Elasticsearch, Kibana, and Fluentd, and the backend logic for querying and displaying logs in a feature platform.
Background
The feature platform is designed for data scientists, data engineers, and machine learning engineers to build AI feature stores for model training, testing, and other data usage, solving problems such as scattered data storage, feature duplication, complex extraction, and difficult usage. It pulls data from HBase, Hive, relational databases, and other ODS layers into a unified feature store. The core uses Flink for ETL, deployed on a Kubernetes cluster, which raises the need to collect and present Flink job logs (driver, JobManager, TaskManager). Kubernetes does not provide a Flink‑on‑YARN‑like log access method, so a solution for log collection, storage, extraction, and display is required.
This article shares common Kubernetes logging solutions and the end‑to‑end EFK‑based log pipeline for PyFlink on Kubernetes.
Kubernetes Logging Solutions
Containers write logs to stdout and stderr, which Kubernetes stores as JSON files on the host. Logs are typically viewed with kubectl logs or docker logs. Kubernetes offers cluster‑level logging to retain logs after pod deletion, node failure, etc. Three typical approaches are recommended:
Deploy a DaemonSet with a logging agent on each node.
Include a sidecar container in the pod to collect application logs.
Push logs directly from the application to a backend storage.
1. DaemonSet Logging Agent
Install a logging agent (commonly Fluentd + Elasticsearch) on every node, mounting the host directory /var/log/containers to forward logs to a backend. This method is non‑intrusive to the application but requires logs to be written to stdout / stderr.
2. Sidecar Container
When applications write logs to files instead of stdout / stderr, a sidecar container can mount the log directory, read the files, and re‑emit the logs to stdout / stderr for collection by the DaemonSet method. This introduces log duplication and increased storage usage.
3. Direct Log Push
Applications can push logs directly to a remote store, but this prevents using kubectl logs and adds resource overhead for the sidecar logging agent.
The first method is officially recommended because it is simple to manage, works with kubectl logs and docker logs, and has minimal impact on application performance. The feature platform adopts this method: Fluentd collects container logs, enriches them with Kubernetes metadata, and forwards them to Elasticsearch.
Fluentd System
Fluentd is an open‑source data collector that handles log ingestion, processing, and forwarding in a single system. Its rich plugin ecosystem allows it to collect logs from various sources, transform them into user‑defined formats, and send them to chosen storage backends, freeing users from complex log‑processing pipelines.
The basic workflow is:
Fluentd gathers data from multiple log sources.
It tags and structures the data according to configured filters.
It routes the data to one or more destination services.
Fluentd is written in Ruby, is highly extensible, and consumes relatively few resources, making it suitable for most enterprise environments.
In a Kubernetes cluster, Fluentd runs on each node, reads container log files, applies filters and transformations, and forwards the enriched logs to an Elasticsearch cluster. Applications can then query Elasticsearch directly or visualize logs with Kibana.
EFK Log System Implementation
Overall Business Flow
Flink tasks run in a Kubernetes cluster, generating logs that are stored on each node under /var/log/containers. A DaemonSet runs Fluentd on every node, mounts the log directory, processes the logs, and pushes them to Elasticsearch. The stored logs are queried by the feature platform backend and visualized in Kibana.
Elasticsearch Deployment on Kubernetes
Elasticsearch can be deployed using the official ECK operator or manually via YAML manifests. Below are example RBAC rules and a StatefulSet definition for a two‑node Elasticsearch cluster.
# RBAC authn and authz
apiVersion: v1
kind: ServiceAccount
metadata:
name: elasticsearch-logging
namespace: kube-system
---
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: elasticsearch-logging
subjects:
- kind: ServiceAccount
name: elasticsearch-logging
namespace: kube-system
roleRef:
kind: ClusterRole
name: admin
apiGroup: "" # Elasticsearch StatefulSet
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: elasticsearch-logging
namespace: kube-system
spec:
serviceName: elasticsearch-logging
replicas: 2
selector:
matchLabels:
k8s-app: elasticsearch-logging
template:
metadata:
labels:
k8s-app: elasticsearch-logging
spec:
serviceAccountName: elasticsearch-logging
containers:
- name: elasticsearch-logging
image: quay.io/fluentd_elasticsearch/elasticsearch:v7.4.2
ports:
- containerPort: 9200
name: db
- containerPort: 9300
name: transport
resources:
limits:
cpu: 1000m
memory: 3Gi
requests:
cpu: 100m
memory: 3Gi
volumeMounts:
- name: elasticsearch-logging
mountPath: /data
env:
- name: NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
- name: MINIMUM_MASTER_NODES
value: "1"
volumes:
- name: elasticsearch-logging
emptyDir: {}
initContainers:
- name: elasticsearch-logging-init
image: alpine:3.6
command: ["/sbin/sysctl", "-w", "vm.max_map_count=262144"]
securityContext:
privileged: trueTo expose the cluster externally, a NodePort Service is created:
apiVersion: v1
kind: Service
metadata:
name: elasticsearch-logging
namespace: kube-system
spec:
type: NodePort
ports:
- name: db
port: 9200
targetPort: 9200
- name: transport
port: 9300
targetPort: 9300
selector:
k8s-app: elasticsearch-loggingKibana Deployment on Kubernetes
Kibana, part of the Elastic Stack, provides visualization and analysis of data stored in Elasticsearch. Deploy it with a standard Deployment manifest:
apiVersion: apps/v1
kind: Deployment
metadata:
name: kibana-logging
namespace: kube-system
spec:
replicas: 1
selector:
matchLabels:
k8s-app: kibana-logging
template:
metadata:
labels:
k8s-app: kibana-logging
spec:
containers:
- name: kibana-logging
image: docker.elastic.co/kibana/kibana-oss:7.4.2
env:
- name: ELASTICSEARCH_HOSTS
value: http://elasticsearch-logging:9200
- name: SERVER_NAME
value: kibana-logging
- name: SERVER_REWRITEBASEPATH
value: "false"
ports:
- containerPort: 5601
name: ui
livenessProbe:
httpGet:
path: /api/status
port: ui
initialDelaySeconds: 5
timeoutSeconds: 10
readinessProbe:
httpGet:
path: /api/status
port: ui
initialDelaySeconds: 5
timeoutSeconds: 10Fluentd Deployment on Kubernetes
Create a ConfigMap that contains Fluentd configuration files, then deploy a DaemonSet with the necessary RBAC permissions.
apiVersion: v1
kind: ConfigMap
metadata:
name: fluentd-es-config-v0.2.0
namespace: kube-system
data:
containers.input.conf: |-
<source>
@type tail
path /var/log/containers/feature*.log
pos_file /var/log/es-containers.log.pos
time_format %Y-%m-%dT%H:%M:%S.%NZ
tag "kubernetes.*"
format json
read_from_head true
</source>
<source>
@type tail
path /var/log/containers/zeppelin-server-*.log
pos_file /var/log/zepplin-containers.log.pos
time_format %Y-%m-%dT%H:%M:%S.%NZ
tag zepplinlog
format json
read_from_head true
</source>
<filter kubernetes.**>
@type record_transformer
@id filter_rt_kube_logs
enable_ruby true
<record>
hostname ${ENV['FLUENT_HOSTNAME']}-${record["kubernetes"]["namespace_name"]}-${record["kubernetes"]["pod_name"]}
program ${record["kubernetes"]["container_name"]}
severity info
facility local0
message ${record['log']}
</record>
</filter>
<filter kubernetes.**>
@type record_transformer
remove_keys $.docker.container_id,$.kubernetes.container_image_id,$.kubernetes.pod_id,$.kubernetes.namespace_id,$.kubernetes.master_url,$.kubernetes.labels.pod-template-hash
</filter>
<match zepplinlog>
@id zepplin_es
@type elasticsearch
@log_level info
type_name _doc
include_tag_key true
host elasticsearch-logging
port 9200
logstash_format true
logstash_prefix zepplin_
<buffer>
@type file
path /var/log/fluentd-buffers/kubernetes.system.buffer.zepplin
flush_mode interval
retry_type exponential_backoff
flush_thread_count 2
flush_interval 10s
retry_forever
retry_max_interval 30
chunk_limit_size 2M
total_limit_size 500M
overflow_action block
</buffer>
</match>
<match **>
@id elasticsearch
@type elasticsearch
@log_level debug
type_name _doc
include_tag_key true
host elasticsearch-logging
port 9200
logstash_format true
logstash_prefix feature_
<buffer>
@type file
path /var/log/fluentd-buffers/kubernetes.system.buffer
flush_mode interval
retry_type exponential_backoff
flush_thread_count 2
flush_interval 10s
retry_forever
retry_max_interval 30
chunk_limit_size 2M
total_limit_size 500M
overflow_action block
</buffer>
</match> apiVersion: v1
kind: ServiceAccount
metadata:
name: fluentd-es
namespace: kube-system
---
kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: fluentd-es
rules:
- apiGroups: [""]
resources: ["namespaces", "pods"]
verbs: ["get", "watch", "list"]
---
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: fluentd-es
subjects:
- kind: ServiceAccount
name: fluentd-es
namespace: kube-system
roleRef:
kind: ClusterRole
name: fluentd-es
apiGroup: ""
---
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: fluentd-es-v3.0.5
namespace: kube-system
spec:
selector:
matchLabels:
k8s-app: fluentd-es
template:
metadata:
labels:
k8s-app: fluentd-es
spec:
serviceAccountName: fluentd-es
containers:
- name: fluentd-es
image: quay.io/fluentd_elasticsearch/fluentd:v3.0.5
env:
- name: FLUENTD_ARGS
value: --no-supervisor -q
resources:
limits:
memory: 500Mi
requests:
cpu: 100m
memory: 200Mi
volumeMounts:
- name: varlog
mountPath: /var/log
- name: varlibdockercontainers
mountPath: /var/lib/docker/containers
readOnly: true
- name: config-volume
mountPath: /etc/fluent/config.d
- name: home-container-volume
mountPath: /home/data/docker/containers
ports:
- containerPort: 24231
name: prometheus
protocol: TCP
terminationGracePeriodSeconds: 30
volumes:
- name: varlog
hostPath:
path: /var/log
- name: varlibdockercontainers
hostPath:
path: /var/lib/docker/containers
- name: config-volume
configMap:
name: fluentd-es-config-v0.2.0
- name: home-container-volume
hostPath:
path: /home/data/docker/containersDeploy the configuration with kubectl apply -f fluentd-configmap.yaml and kubectl apply -f fluentd-daemonset.yaml. After deployment, Fluentd reads the specified log files, enriches them with Kubernetes metadata, and forwards them to the Elasticsearch cluster.
Log Presentation Logic
The backend constructs query conditions to retrieve Flink driver, JobManager, and TaskManager logs from Elasticsearch based on index prefixes and pod name fields, concatenates the log entries into a single string, and returns it to the frontend UI for display. This enables detailed log inspection for Flink job troubleshooting. The key configuration is the Fluentd Kubernetes filter, which adds metadata such as namespace, pod name, image name, and labels to each log record.
Conclusion
The article presented an EFK‑based solution for collecting and displaying Flink task logs on Kubernetes, covering Kubernetes logging mechanisms, Flink on Kubernetes implementation, Elasticsearch cluster setup, Fluentd architecture and configuration, and log presentation logic. Future work includes log analysis, alerting, statistical reporting, and further platform enhancements.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
