Cloud Native 19 min read

Build a Fluent Bit → ClickHouse Log Pipeline on Kubernetes

This guide shows how to configure Fluent Bit to collect Kubernetes container logs, develop a Go‑based ClickHouse output plugin, deploy the pipeline with ConfigMap, RBAC and DaemonSet resources, define the ClickHouse table schema, and evaluate performance across different log rates.

Cloud Native Technology Community
Cloud Native Technology Community
Cloud Native Technology Community
Build a Fluent Bit → ClickHouse Log Pipeline on Kubernetes

Preparation and Overview

Fluent Bit is a lightweight, plugin‑based log collector written in C and fully compatible with Docker and Kubernetes environments. It processes logs through a pipeline of input, parser, filter, buffer, router, and output stages.

Fluent Bit Input Configuration for Kubernetes

[INPUT]
  Name   tail
  Tag    kube.*
  Path   /var/log/containers/*.log
  Parser docker
  DB     /var/log/flb_kube.db
  Mem_Buf_Limit 5MB
  Skip_Long_Lines On
  Refresh_Interval 10

The tail input reads container log files, applies the Docker parser, and stores offsets in a local SQLite database.

Custom ClickHouse Output Plugin (Go)

package main

import (
    "C"
    "database/sql"
    "fmt"
    "os"
    "strconv"
    "sync"
    "time"
    "unsafe"
    "github.com/fluent/fluent-bit-go/output"
    "github.com/kshvakov/clickhouse"
    klog "k8s.io/klog"
)

var (
    client *sql.DB
    database string
    table string
    batchSize int
    insertSQL = "INSERT INTO %s.%s(date, cluster, namespace, app, pod_name, container_name, host, log, ts) VALUES (?,?,?,?,?,?,?,?,?)"
    rw sync.RWMutex
    buffer = make([]Log, 0)
)

const (
    DefaultWriteTimeout = "20"
    DefaultReadTimeout  = "10"
    DefaultBatchSize    = 1024
)

type Log struct {
    Cluster   string
    Namespace string
    App       string
    Pod       string
    Container string
    Host      string
    Log       string
    Ts        time.Time
}

//export FLBPluginRegister
func FLBPluginRegister(ctx unsafe.Pointer) int {
    return output.FLBPluginRegister(ctx, "clickhouse", "ClickHouse Output Plugin")
}

//export FLBPluginInit
func FLBPluginInit(ctx unsafe.Pointer) int {
    // read required environment variables
    host, ok := os.LookupEnv("CLICKHOUSE_HOST")
    if !ok || host == "" { klog.Error("you must set host of clickhouse!"); return output.FLB_ERROR }
    user, ok := os.LookupEnv("CLICKHOUSE_USER")
    if !ok || user == "" { klog.Error("you must set user of clickhouse!"); return output.FLB_ERROR }
    password, ok := os.LookupEnv("CLICKHOUSE_PASSWORD")
    if !ok || password == "" { klog.Error("you must set password of clickhouse!"); return output.FLB_ERROR }
    database, ok = os.LookupEnv("CLICKHOUSE_DATABASE")
    if !ok || database == "" { klog.Error("you must set database of clickhouse!"); return output.FLB_ERROR }
    table, ok = os.LookupEnv("CLICKHOUSE_TABLE")
    if !ok || table == "" { klog.Error("you must set table of clickhouse!"); return output.FLB_ERROR }

    // optional batch size and timeouts
    if v, ok := os.LookupEnv("CLICKHOUSE_BATCH_SIZE"); ok {
        if size, err := strconv.Atoi(v); err == nil { batchSize = size } else { batchSize = DefaultBatchSize }
    } else { batchSize = DefaultBatchSize }
    writeTimeout := os.Getenv("CLICKHOUSE_WRITE_TIMEOUT")
    if writeTimeout == "" { writeTimeout = DefaultWriteTimeout }
    readTimeout := os.Getenv("CLICKHOUSE_READ_TIMEOUT")
    if readTimeout == "" { readTimeout = DefaultReadTimeout }

    dsn := "tcp://" + host + "?username=" + user + "&password=" + password + "&database=" + database + "&write_timeout=" + writeTimeout + "&read_timeout=" + readTimeout
    db, err := sql.Open("clickhouse", dsn)
    if err != nil { klog.Error("connecting to clickhouse: ", err); return output.FLB_ERROR }
    if err = db.Ping(); err != nil {
        if exc, ok := err.(*clickhouse.Exception); ok {
            klog.Errorf("[%d] %s 
%s
", exc.Code, exc.Message, exc.StackTrace)
        } else { klog.Errorf("Failed to ping clickhouse: %v", err) }
        return output.FLB_ERROR
    }
    client = db
    return output.FLB_OK
}

//export FLBPluginFlush
func FLBPluginFlush(data unsafe.Pointer, length C.int, tag *C.char) int {
    rw.Lock()
    defer rw.Unlock()
    if err := client.Ping(); err != nil {
        if exc, ok := err.(*clickhouse.Exception); ok {
            klog.Errorf("[%d] %s 
%s
", exc.Code, exc.Message, exc.StackTrace)
        } else { klog.Errorf("Failed to ping clickhouse: %v", err) }
        return output.FLB_ERROR
    }
    dec := output.NewDecoder(data, int(length))
    for {
        ret, tsData, mapData := output.GetRecord(dec)
        if ret != 0 { break }
        var ts time.Time
        switch t := tsData.(type) {
        case output.FLBTime:
            ts = t.Time
        case uint64:
            ts = time.Unix(int64(t), 0)
        default:
            ts = time.Now()
        }
        flat, err := Flatten(mapData, "", UnderscoreStyle)
        if err != nil { break }
        var l Log
        for k, v := range flat {
            val := ""
            switch t := v.(type) {
            case string:
                val = t
            case []byte:
                val = string(t)
            default:
                val = fmt.Sprintf("%v", v)
            }
            switch k {
            case "cluster": l.Cluster = val
            case "kubernetes_namespace_name": l.Namespace = val
            case "kubernetes_labels_app", "kubernetes_labels_k8s-app": l.App = val
            case "kubernetes_pod_name": l.Pod = val
            case "kubernetes_container_name": l.Container = val
            case "kubernetes_host": l.Host = val
            case "log": l.Log = val
            }
        }
        l.Ts = ts
        buffer = append(buffer, l)
    }
    if len(buffer) < batchSize { return output.FLB_OK }
    sqlStmt := fmt.Sprintf(insertSQL, database, table)
    tx, err := client.Begin()
    if err != nil { klog.Errorf("begin transaction failure: %s", err.Error()); return output.FLB_ERROR }
    stmt, err := tx.Prepare(sqlStmt)
    if err != nil { klog.Errorf("prepare statement failure: %s", err.Error()); return output.FLB_ERROR }
    for _, l := range buffer {
        _, err = stmt.Exec(l.Ts, l.Cluster, l.Namespace, l.App, l.Pod, l.Container, l.Host, l.Log, l.Ts)
        if err != nil { klog.Errorf("statement exec failure: %s", err.Error()); return output.FLB_ERROR }
    }
    if err = tx.Commit(); err != nil { klog.Errorf("commit failed: %s", err.Error()); return output.FLB_ERROR }
    buffer = make([]Log, 0)
    return output.FLB_OK
}

//export FLBPluginExit
func FLBPluginExit() int { return output.FLB_OK }

func main() {}

The plugin registers as "clickhouse", reads connection parameters from environment variables, buffers records until the configured batch size, and writes them to ClickHouse using a prepared INSERT statement.

ClickHouse Table Schema

CREATE DATABASE IF NOT EXISTS scmp;
CREATE TABLE IF NOT EXISTS scmp.logs(
    date Date DEFAULT toDate(0),
    cluster String,
    namespace String,
    app String,
    pod_name String,
    container_name String,
    host String,
    log String,
    ts DateTime
) ENGINE = MergeTree()
PARTITION BY toYYYYMMDD(date)
ORDER BY (cluster, namespace, app, pod_name, container_name, host, ts);

Kubernetes Deployment Resources

Configuration files are stored in a ConfigMap and mounted into each Fluent Bit pod.

apiVersion: v1
kind: ConfigMap
metadata:
  name: k8s-log-agent-config
  namespace: kube
  labels:
    k8s-app: k8s-log-agent

data:
  fluent-bit.conf: |
    [SERVICE]
      Flush 1
      Log_Level error
      Daemon off
      Parsers_File parsers.conf
      HTTP_Server On
      HTTP_Listen 0.0.0.0
      HTTP_Port 2020
    @INCLUDE input-kubernetes.conf
    @INCLUDE filter-kubernetes.conf
    @INCLUDE output-kubernetes.conf
  input-kubernetes.conf: |
    [INPUT]
      Name tail
      Tag kube.*
      Path /var/log/containers/*.log
      Parser docker
      DB /var/log/flb_kube.db
      Mem_Buf_Limit 5MB
      Skip_Long_Lines On
      Refresh_Interval 10
  filter-kubernetes.conf: |
    [FILTER]
      Name kubernetes
      Match *
      Kube_URL https://kubernetes.default.svc.cluster.local:443
      Merge_Log On
      Annotations Off
      Kube_Tag_Prefix kube.var.log.containers.
      Merge_Log_Key log_processed
    [FILTER]
      Name modify
      Match *
      Set cluster ${CLUSTER_NAME}
  output-kubernetes.conf: |
    [OUTPUT]
      Name clickhouse
      Match *

RBAC permissions allow the DaemonSet to read pods and namespaces.

apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: k8s-log-agent-read
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: ClusterRole
  name: k8s-log-agent-read
subjects:
- kind: ServiceAccount
  name: k8s-log-agent
  namespace: kube-system
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: k8s-log-agent-read
rules:
- apiGroups: [""]
  resources:
  - namespaces
  - pods
  verbs: ["get","list","watch"]
---
apiVersion: v1
kind: ServiceAccount
metadata:
  name: k8s-log-agent
  namespace: kube

The DaemonSet runs Fluent Bit on every node, mounts the ConfigMap, and injects environment variables for ClickHouse connection details.

apiVersion: apps/v1
kind: DaemonSet
metadata:
  name: k8s-log-agent
  namespace: kube
  labels:
    k8s-app: k8s-log-agent
    version: v1
spec:
  selector:
    matchLabels:
      k8s-app: k8s-log-agent
  template:
    metadata:
      labels:
        k8s-app: k8s-log-agent
        version: v1
    spec:
      containers:
      - name: fluent-bit
        image: iyacontrol/fluent-bit-ck:1.2.2
        ports:
        - containerPort: 2020
        resources:
          limits:
            cpu: 200m
            memory: 200Mi
          requests:
            cpu: 200m
            memory: 200Mi
        env:
        - name: CLUSTER_NAME
          value: "clickhouse"
        - name: CLICKHOUSE_HOST
          value: "10.1.62.62:9150"
        - name: CLICKHOUSE_USER
          value: "oms"
        - name: CLICKHOUSE_PASSWORD
          value: "EBupt123"
        - name: CLICKHOUSE_DATABASE
          value: "scmp"
        - name: CLICKHOUSE_TABLE
          value: "logs"
        - name: NODENAME
          valueFrom:
            fieldRef:
              fieldPath: spec.nodeName
        volumeMounts:
        - name: varlog
          mountPath: /var/log
        - name: varlibdockercontainers
          mountPath: /var/lib/docker/containers
          readOnly: true
        - name: k8s-log-agent-config
          mountPath: /fluent-bit/etc/
      terminationGracePeriodSeconds: 10
      volumes:
      - name: varlog
        hostPath:
          path: /var/log
      - name: varlibdockercontainers
        hostPath:
          path: /var/lib/docker/containers
      - name: k8s-log-agent-config
        configMap:
          name: k8s-log-agent-config
      serviceAccountName: k8s-log-agent
      tolerations:
      - key: node-role.kubernetes.io/master
        operator: Exists
        effect: NoSchedule

Performance Test

The test measured CPU and memory usage of Fluent Bit under various log generation rates (5000, 1000, 500, 200, 50, and 1 log per second) for ten minutes, using htop to observe resource consumption and latency until log backlog appeared.

Results showed that at 1000 log/s and 500 log/s the CPU usage remained stable within a 10 % fluctuation band. Higher or lower rates (5000 log/s, 200 log/s, 50 log/s, 1 log/s) caused larger CPU swings, with peaks up to 52 % and troughs down to 35 %.

Latency analysis indicated that after three minutes of sustained load, the backlog grew to a point where logs generated within a ten‑second window were not yet persisted, especially at the highest and lowest rates.

Overall, the pipeline demonstrates reliable log collection and storage at moderate rates, while extreme rates expose resource saturation and increased latency.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

KubernetesPerformance TestingClickHouselog collectionFluent BitGo Plugin
Cloud Native Technology Community
Written by

Cloud Native Technology Community

The Cloud Native Technology Community, part of the CNBPA Cloud Native Technology Practice Alliance, focuses on evangelizing cutting‑edge cloud‑native technologies and practical implementations. It shares in‑depth content, case studies, and event/meetup information on containers, Kubernetes, DevOps, Service Mesh, and other cloud‑native tech, along with updates from the CNBPA alliance.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.