Build a Fluent Bit → ClickHouse Log Pipeline on Kubernetes
This guide shows how to configure Fluent Bit to collect Kubernetes container logs, develop a Go‑based ClickHouse output plugin, deploy the pipeline with ConfigMap, RBAC and DaemonSet resources, define the ClickHouse table schema, and evaluate performance across different log rates.
Preparation and Overview
Fluent Bit is a lightweight, plugin‑based log collector written in C and fully compatible with Docker and Kubernetes environments. It processes logs through a pipeline of input, parser, filter, buffer, router, and output stages.
Fluent Bit Input Configuration for Kubernetes
[INPUT]
Name tail
Tag kube.*
Path /var/log/containers/*.log
Parser docker
DB /var/log/flb_kube.db
Mem_Buf_Limit 5MB
Skip_Long_Lines On
Refresh_Interval 10The tail input reads container log files, applies the Docker parser, and stores offsets in a local SQLite database.
Custom ClickHouse Output Plugin (Go)
package main
import (
"C"
"database/sql"
"fmt"
"os"
"strconv"
"sync"
"time"
"unsafe"
"github.com/fluent/fluent-bit-go/output"
"github.com/kshvakov/clickhouse"
klog "k8s.io/klog"
)
var (
client *sql.DB
database string
table string
batchSize int
insertSQL = "INSERT INTO %s.%s(date, cluster, namespace, app, pod_name, container_name, host, log, ts) VALUES (?,?,?,?,?,?,?,?,?)"
rw sync.RWMutex
buffer = make([]Log, 0)
)
const (
DefaultWriteTimeout = "20"
DefaultReadTimeout = "10"
DefaultBatchSize = 1024
)
type Log struct {
Cluster string
Namespace string
App string
Pod string
Container string
Host string
Log string
Ts time.Time
}
//export FLBPluginRegister
func FLBPluginRegister(ctx unsafe.Pointer) int {
return output.FLBPluginRegister(ctx, "clickhouse", "ClickHouse Output Plugin")
}
//export FLBPluginInit
func FLBPluginInit(ctx unsafe.Pointer) int {
// read required environment variables
host, ok := os.LookupEnv("CLICKHOUSE_HOST")
if !ok || host == "" { klog.Error("you must set host of clickhouse!"); return output.FLB_ERROR }
user, ok := os.LookupEnv("CLICKHOUSE_USER")
if !ok || user == "" { klog.Error("you must set user of clickhouse!"); return output.FLB_ERROR }
password, ok := os.LookupEnv("CLICKHOUSE_PASSWORD")
if !ok || password == "" { klog.Error("you must set password of clickhouse!"); return output.FLB_ERROR }
database, ok = os.LookupEnv("CLICKHOUSE_DATABASE")
if !ok || database == "" { klog.Error("you must set database of clickhouse!"); return output.FLB_ERROR }
table, ok = os.LookupEnv("CLICKHOUSE_TABLE")
if !ok || table == "" { klog.Error("you must set table of clickhouse!"); return output.FLB_ERROR }
// optional batch size and timeouts
if v, ok := os.LookupEnv("CLICKHOUSE_BATCH_SIZE"); ok {
if size, err := strconv.Atoi(v); err == nil { batchSize = size } else { batchSize = DefaultBatchSize }
} else { batchSize = DefaultBatchSize }
writeTimeout := os.Getenv("CLICKHOUSE_WRITE_TIMEOUT")
if writeTimeout == "" { writeTimeout = DefaultWriteTimeout }
readTimeout := os.Getenv("CLICKHOUSE_READ_TIMEOUT")
if readTimeout == "" { readTimeout = DefaultReadTimeout }
dsn := "tcp://" + host + "?username=" + user + "&password=" + password + "&database=" + database + "&write_timeout=" + writeTimeout + "&read_timeout=" + readTimeout
db, err := sql.Open("clickhouse", dsn)
if err != nil { klog.Error("connecting to clickhouse: ", err); return output.FLB_ERROR }
if err = db.Ping(); err != nil {
if exc, ok := err.(*clickhouse.Exception); ok {
klog.Errorf("[%d] %s
%s
", exc.Code, exc.Message, exc.StackTrace)
} else { klog.Errorf("Failed to ping clickhouse: %v", err) }
return output.FLB_ERROR
}
client = db
return output.FLB_OK
}
//export FLBPluginFlush
func FLBPluginFlush(data unsafe.Pointer, length C.int, tag *C.char) int {
rw.Lock()
defer rw.Unlock()
if err := client.Ping(); err != nil {
if exc, ok := err.(*clickhouse.Exception); ok {
klog.Errorf("[%d] %s
%s
", exc.Code, exc.Message, exc.StackTrace)
} else { klog.Errorf("Failed to ping clickhouse: %v", err) }
return output.FLB_ERROR
}
dec := output.NewDecoder(data, int(length))
for {
ret, tsData, mapData := output.GetRecord(dec)
if ret != 0 { break }
var ts time.Time
switch t := tsData.(type) {
case output.FLBTime:
ts = t.Time
case uint64:
ts = time.Unix(int64(t), 0)
default:
ts = time.Now()
}
flat, err := Flatten(mapData, "", UnderscoreStyle)
if err != nil { break }
var l Log
for k, v := range flat {
val := ""
switch t := v.(type) {
case string:
val = t
case []byte:
val = string(t)
default:
val = fmt.Sprintf("%v", v)
}
switch k {
case "cluster": l.Cluster = val
case "kubernetes_namespace_name": l.Namespace = val
case "kubernetes_labels_app", "kubernetes_labels_k8s-app": l.App = val
case "kubernetes_pod_name": l.Pod = val
case "kubernetes_container_name": l.Container = val
case "kubernetes_host": l.Host = val
case "log": l.Log = val
}
}
l.Ts = ts
buffer = append(buffer, l)
}
if len(buffer) < batchSize { return output.FLB_OK }
sqlStmt := fmt.Sprintf(insertSQL, database, table)
tx, err := client.Begin()
if err != nil { klog.Errorf("begin transaction failure: %s", err.Error()); return output.FLB_ERROR }
stmt, err := tx.Prepare(sqlStmt)
if err != nil { klog.Errorf("prepare statement failure: %s", err.Error()); return output.FLB_ERROR }
for _, l := range buffer {
_, err = stmt.Exec(l.Ts, l.Cluster, l.Namespace, l.App, l.Pod, l.Container, l.Host, l.Log, l.Ts)
if err != nil { klog.Errorf("statement exec failure: %s", err.Error()); return output.FLB_ERROR }
}
if err = tx.Commit(); err != nil { klog.Errorf("commit failed: %s", err.Error()); return output.FLB_ERROR }
buffer = make([]Log, 0)
return output.FLB_OK
}
//export FLBPluginExit
func FLBPluginExit() int { return output.FLB_OK }
func main() {}The plugin registers as "clickhouse", reads connection parameters from environment variables, buffers records until the configured batch size, and writes them to ClickHouse using a prepared INSERT statement.
ClickHouse Table Schema
CREATE DATABASE IF NOT EXISTS scmp;
CREATE TABLE IF NOT EXISTS scmp.logs(
date Date DEFAULT toDate(0),
cluster String,
namespace String,
app String,
pod_name String,
container_name String,
host String,
log String,
ts DateTime
) ENGINE = MergeTree()
PARTITION BY toYYYYMMDD(date)
ORDER BY (cluster, namespace, app, pod_name, container_name, host, ts);Kubernetes Deployment Resources
Configuration files are stored in a ConfigMap and mounted into each Fluent Bit pod.
apiVersion: v1
kind: ConfigMap
metadata:
name: k8s-log-agent-config
namespace: kube
labels:
k8s-app: k8s-log-agent
data:
fluent-bit.conf: |
[SERVICE]
Flush 1
Log_Level error
Daemon off
Parsers_File parsers.conf
HTTP_Server On
HTTP_Listen 0.0.0.0
HTTP_Port 2020
@INCLUDE input-kubernetes.conf
@INCLUDE filter-kubernetes.conf
@INCLUDE output-kubernetes.conf
input-kubernetes.conf: |
[INPUT]
Name tail
Tag kube.*
Path /var/log/containers/*.log
Parser docker
DB /var/log/flb_kube.db
Mem_Buf_Limit 5MB
Skip_Long_Lines On
Refresh_Interval 10
filter-kubernetes.conf: |
[FILTER]
Name kubernetes
Match *
Kube_URL https://kubernetes.default.svc.cluster.local:443
Merge_Log On
Annotations Off
Kube_Tag_Prefix kube.var.log.containers.
Merge_Log_Key log_processed
[FILTER]
Name modify
Match *
Set cluster ${CLUSTER_NAME}
output-kubernetes.conf: |
[OUTPUT]
Name clickhouse
Match *RBAC permissions allow the DaemonSet to read pods and namespaces.
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: k8s-log-agent-read
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: k8s-log-agent-read
subjects:
- kind: ServiceAccount
name: k8s-log-agent
namespace: kube-system
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: k8s-log-agent-read
rules:
- apiGroups: [""]
resources:
- namespaces
- pods
verbs: ["get","list","watch"]
---
apiVersion: v1
kind: ServiceAccount
metadata:
name: k8s-log-agent
namespace: kubeThe DaemonSet runs Fluent Bit on every node, mounts the ConfigMap, and injects environment variables for ClickHouse connection details.
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: k8s-log-agent
namespace: kube
labels:
k8s-app: k8s-log-agent
version: v1
spec:
selector:
matchLabels:
k8s-app: k8s-log-agent
template:
metadata:
labels:
k8s-app: k8s-log-agent
version: v1
spec:
containers:
- name: fluent-bit
image: iyacontrol/fluent-bit-ck:1.2.2
ports:
- containerPort: 2020
resources:
limits:
cpu: 200m
memory: 200Mi
requests:
cpu: 200m
memory: 200Mi
env:
- name: CLUSTER_NAME
value: "clickhouse"
- name: CLICKHOUSE_HOST
value: "10.1.62.62:9150"
- name: CLICKHOUSE_USER
value: "oms"
- name: CLICKHOUSE_PASSWORD
value: "EBupt123"
- name: CLICKHOUSE_DATABASE
value: "scmp"
- name: CLICKHOUSE_TABLE
value: "logs"
- name: NODENAME
valueFrom:
fieldRef:
fieldPath: spec.nodeName
volumeMounts:
- name: varlog
mountPath: /var/log
- name: varlibdockercontainers
mountPath: /var/lib/docker/containers
readOnly: true
- name: k8s-log-agent-config
mountPath: /fluent-bit/etc/
terminationGracePeriodSeconds: 10
volumes:
- name: varlog
hostPath:
path: /var/log
- name: varlibdockercontainers
hostPath:
path: /var/lib/docker/containers
- name: k8s-log-agent-config
configMap:
name: k8s-log-agent-config
serviceAccountName: k8s-log-agent
tolerations:
- key: node-role.kubernetes.io/master
operator: Exists
effect: NoSchedulePerformance Test
The test measured CPU and memory usage of Fluent Bit under various log generation rates (5000, 1000, 500, 200, 50, and 1 log per second) for ten minutes, using htop to observe resource consumption and latency until log backlog appeared.
Results showed that at 1000 log/s and 500 log/s the CPU usage remained stable within a 10 % fluctuation band. Higher or lower rates (5000 log/s, 200 log/s, 50 log/s, 1 log/s) caused larger CPU swings, with peaks up to 52 % and troughs down to 35 %.
Latency analysis indicated that after three minutes of sustained load, the backlog grew to a point where logs generated within a ten‑second window were not yet persisted, especially at the highest and lowest rates.
Overall, the pipeline demonstrates reliable log collection and storage at moderate rates, while extreme rates expose resource saturation and increased latency.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Cloud Native Technology Community
The Cloud Native Technology Community, part of the CNBPA Cloud Native Technology Practice Alliance, focuses on evangelizing cutting‑edge cloud‑native technologies and practical implementations. It shares in‑depth content, case studies, and event/meetup information on containers, Kubernetes, DevOps, Service Mesh, and other cloud‑native tech, along with updates from the CNBPA alliance.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
