Cloud Native 16 min read

Build a Real‑Time eBPF‑Based Kubernetes Network Anomaly Detector

This article walks through designing and implementing a zero‑intrusion, real‑time network anomaly detection system for Kubernetes using eBPF, covering architecture, kernel‑space eBPF programs, Go user‑space collectors, deployment via DaemonSet, performance optimizations, alerting integration with Prometheus/Grafana, and real‑world case studies.

MaGe Linux Operations
MaGe Linux Operations
MaGe Linux Operations
Build a Real‑Time eBPF‑Based Kubernetes Network Anomaly Detector

Build a Real‑Time eBPF‑Based Kubernetes Network Anomaly Detector

Why eBPF?

As a cloud‑native operations engineer, I have seen many production incidents caused by network problems. Traditional monitoring is reactive; by the time an issue is detected, users are already complaining. eBPF provides a revolutionary way to perform non‑intrusive, kernel‑level monitoring in real time.

Pain Points of Traditional Network Monitoring

In Kubernetes environments, network issues have the following characteristics:

High complexity : Pod communication involves CNI, Service Mesh, load balancers, and other components.

Difficult troubleshooting : Problems are often discovered after they affect users, lacking deep real‑time observability.

High cost : Traditional APM tools are expensive and have limited kernel‑level visibility.

eBPF enables non‑intrusive monitoring directly in the kernel.

System Architecture Design

The system follows a layered architecture consisting of the following components:

┌─────────────────────────────────────────────────────────┐
│               Web Dashboard                              │
├─────────────────────────────────────────────────────────┤
│               Alert Manager                               │
├─────────────────────────────────────────────────────────┤
│               Data Processor                              │
├─────────────────────────────────────────────────────────┤
│               eBPF Data Collector                         │
├─────────────────────────────────────────────────────────┤
│               Kernel Space                                │
└─────────────────────────────────────────────────────────┘

Core Implementation: eBPF Program Development

1. TCP Connection Anomaly Detection

The eBPF program monitors TCP connection states:

// tcp_monitor.bpf.c
#include <linux/bpf.h>
#include <linux/ptrace.h>
#include <linux/tcp.h>
#include <bpf/bpf_helpers.h>

struct tcp_event {
    __u32 pid;
    __u32 saddr;
    __u32 daddr;
    __u16 sport;
    __u16 dport;
    __u8  state;
    __u64 timestamp;
};

struct {
    __uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY);
    __uint(key_size, sizeof(__u32));
    __uint(value_size, sizeof(__u32));
} tcp_events SEC(".maps");

SEC("kprobe/tcp_set_state")
int trace_tcp_state_change(struct pt_regs *ctx) {
    struct sock *sk = (struct sock *)PT_REGS_PARM1(ctx);
    int new_state = PT_REGS_PARM2(ctx);

    struct tcp_event event = {};
    event.timestamp = bpf_ktime_get_ns();
    event.pid = bpf_get_current_pid_tgid() >> 32;
    event.state = new_state;

    // Get connection info
    BPF_CORE_READ_INTO(&event.saddr, sk, __sk_common.skc_rcv_saddr);
    BPF_CORE_READ_INTO(&event.daddr, sk, __sk_common.skc_daddr);
    BPF_CORE_READ_INTO(&event.sport, sk, __sk_common.skc_num);
    BPF_CORE_READ_INTO(&event.dport, sk, __sk_common.skc_dport);

    // Only care about abnormal state changes
    if (new_state == TCP_CLOSE || new_state == TCP_TIME_WAIT) {
        bpf_perf_event_output(ctx, &tcp_events, BPF_F_CURRENT_CPU, &event, sizeof(event));
    }
    return 0;
}

char LICENSE[] SEC("license") = "GPL";

2. Go User‑Space Program

The user‑space collector loads the eBPF object, attaches kprobes, reads events, and processes them:

// main.go
package main

import (
    "bytes"
    "encoding/binary"
    "fmt"
    "log"
    "net"
    "time"

    "github.com/cilium/ebpf"
    "github.com/cilium/ebpf/link"
    "github.com/cilium/ebpf/perf"
    "github.com/cilium/ebpf/rlimit"
)

type TCPEvent struct {
    PID       uint32
    SrcAddr   uint32
    DstAddr   uint32
    SrcPort   uint16
    DstPort   uint16
    State     uint8
    Timestamp uint64
}

type NetworkMonitor struct {
    collection *ebpf.Collection
    reader     *perf.Reader
    links      []link.Link
}

func NewNetworkMonitor() (*NetworkMonitor, error) {
    if err := rlimit.RemoveMemlock(); err != nil {
        return nil, fmt.Errorf("remove memlock: %w", err)
    }
    collection, err := ebpf.NewCollectionFromFile("tcp_monitor.o")
    if err != nil {
        return nil, fmt.Errorf("load eBPF program: %w", err)
    }
    kprobe, err := link.Kprobe(link.KprobeOptions{Symbol: "tcp_set_state", Program: collection.Programs["trace_tcp_state_change"]})
    if err != nil {
        return nil, fmt.Errorf("attach kprobe: %w", err)
    }
    reader, err := perf.NewReader(collection.Maps["tcp_events"], 4096)
    if err != nil {
        return nil, fmt.Errorf("create perf reader: %w", err)
    }
    return &NetworkMonitor{collection: collection, reader: reader, links: []link.Link{kprobe}}, nil
}

func (nm *NetworkMonitor) Start() error {
    log.Println("开始监控 TCP 连接状态变化...")
    for {
        record, err := nm.reader.Read()
        if err != nil {
            return fmt.Errorf("read perf event: %w", err)
        }
        var event TCPEvent
        if err := binary.Read(bytes.NewReader(record.RawSample), binary.LittleEndian, &event); err != nil {
            continue
        }
        nm.processEvent(&event)
    }
}

func (nm *NetworkMonitor) processEvent(event *TCPEvent) {
    srcIP := intToIP(event.SrcAddr)
    dstIP := intToIP(event.DstAddr)
    if event.State == 7 { // TCP_CLOSE
        log.Printf("检测到连接关闭: %s:%d -> %s:%d (PID: %d)", srcIP, event.SrcPort, dstIP, event.DstPort, event.PID)
        if nm.isAbnormalClose(event) {
            nm.triggerAlert(event)
        }
    }
}

func (nm *NetworkMonitor) isAbnormalClose(event *TCPEvent) bool {
    // Placeholder for anomaly detection algorithm (e.g., ML model or rule engine)
    return nm.checkConnectionFlood(event)
}

func (nm *NetworkMonitor) checkConnectionFlood(event *TCPEvent) bool {
    // Simplified version: detect many closes in a short time window
    return false
}

func (nm *NetworkMonitor) triggerAlert(event *TCPEvent) {
    alert := Alert{Type: "connection_abnormal", Severity: "warning", Message: fmt.Sprintf("检测到异常连接关闭: PID %d", event.PID), Timestamp: time.Now(), Metadata: map[string]interface{}{ "src_ip": intToIP(event.SrcAddr).String(), "dst_ip": intToIP(event.DstAddr).String(), "src_port": event.SrcPort, "dst_port": event.DstPort }}
    nm.sendAlert(alert)
}

func intToIP(addr uint32) net.IP {
    ip := make(net.IP, 4)
    binary.LittleEndian.PutUint32(ip, addr)
    return ip
}

Deploying to Kubernetes

1. Create a DaemonSet

Run the monitor on every node:

apiVersion: apps/v1
kind: DaemonSet
metadata:
  name: ebpf-network-monitor
  namespace: monitoring
spec:
  selector:
    matchLabels:
      app: ebpf-network-monitor
  template:
    metadata:
      labels:
        app: ebpf-network-monitor
    spec:
      hostNetwork: true
      hostPID: true
      containers:
      - name: monitor
        image: ebpf-network-monitor:latest
        securityContext:
          privileged: true
        volumeMounts:
        - name: sys-kernel-debug
          mountPath: /sys/kernel/debug
        - name: lib-modules
          mountPath: /lib/modules
        - name: usr-src
          mountPath: /usr/src
        env:
        - name: NODE_NAME
          valueFrom:
            fieldRef:
              fieldPath: spec.nodeName
      volumes:
      - name: sys-kernel-debug
        hostPath:
          path: /sys/kernel/debug
      - name: lib-modules
        hostPath:
          path: /lib/modules
      - name: usr-src
        hostPath:
          path: /usr/src
      serviceAccount: ebpf-monitor
---
apiVersion: v1
kind: ServiceAccount
metadata:
  name: ebpf-monitor
  namespace: monitoring
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: ebpf-monitor
rules:
- apiGroups: [""]
  resources: ["pods", "nodes"]
  verbs: ["get", "list", "watch"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: ebpf-monitor
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: ClusterRole
  name: ebpf-monitor
subjects:
- kind: ServiceAccount
  name: ebpf-monitor
  namespace: monitoring

2. Add Network‑Policy Violation Detection

Extend the eBPF program to monitor policy violations:

// network_policy.bpf.c
SEC("kprobe/ip_rcv")
int trace_packet_receive(struct pt_regs *ctx) {
    struct sk_buff *skb = (struct sk_buff *)PT_REGS_PARM1(ctx);
    struct iphdr *ip;
    // Read IP header
    bpf_probe_read(&ip, sizeof(struct iphdr), skb->data + sizeof(struct ethhdr));
    // Check for policy violation
    if (is_policy_violation(ip)) {
        struct policy_event event = {
            .src_ip = ip->saddr,
            .dst_ip = ip->daddr,
            .protocol = ip->protocol,
            .timestamp = bpf_ktime_get_ns(),
        };
        bpf_perf_event_output(ctx, &policy_events, BPF_F_CURRENT_CPU, &event, sizeof(event));
    }
    return 0;
}

Practical Optimization Techniques

1. Performance Optimization

// Use batch processing to reduce syscalls
type EventBatcher struct {
    events []TCPEvent
    mutex  sync.Mutex
    timer  *time.Timer
}

func (eb *EventBatcher) AddEvent(event TCPEvent) {
    eb.mutex.Lock()
    defer eb.mutex.Unlock()
    eb.events = append(eb.events, event)
    if len(eb.events) >= 100 {
        eb.flush()
    } else if eb.timer == nil {
        eb.timer = time.AfterFunc(100*time.Millisecond, eb.flush)
    }
}

func (eb *EventBatcher) flush() {
    eb.mutex.Lock()
    events := eb.events
    eb.events = nil
    eb.timer = nil
    eb.mutex.Unlock()
    for _, event := range events {
        processEvent(&event)
    }
}

2. Intelligent Anomaly Detection

// Statistics‑based anomaly detector
type AnomalyDetector struct {
    connections map[string]*ConnectionStats
    mutex       sync.RWMutex
}

type ConnectionStats struct {
    Count     int64
    LastSeen  time.Time
    Failures  int64
    AvgLatency float64
}

func (ad *AnomalyDetector) DetectAnomaly(event *TCPEvent) bool {
    key := fmt.Sprintf("%s:%d->%s:%d", intToIP(event.SrcAddr), event.SrcPort, intToIP(event.DstAddr), event.DstPort)
    ad.mutex.RLock()
    stats, exists := ad.connections[key]
    ad.mutex.RUnlock()
    if !exists {
        stats = &ConnectionStats{}
        ad.mutex.Lock()
        ad.connections[key] = stats
        ad.mutex.Unlock()
    }
    stats.Count++
    stats.LastSeen = time.Now()
    if event.State == TCP_CLOSE {
        stats.Failures++
        failureRate := float64(stats.Failures) / float64(stats.Count)
        return failureRate > 0.1 && stats.Count > 10
    }
    return false
}

Alerting and Visualization

1. Prometheus Integration

// metrics.go
package main

import (
    "github.com/prometheus/client_golang/prometheus"
    "github.com/prometheus/client_golang/prometheus/promauto"
)

var (
    tcpConnectionsTotal = promauto.NewCounterVec(prometheus.CounterOpts{Name: "tcp_connections_total", Help: "Total number of TCP connections"}, []string{"src_ip", "dst_ip", "state"})
    networkAnomaliesTotal = promauto.NewCounterVec(prometheus.CounterOpts{Name: "network_anomalies_total", Help: "Total number of network anomalies detected"}, []string{"type", "severity"})
)

func updateMetrics(event *TCPEvent) {
    tcpConnectionsTotal.WithLabelValues(intToIP(event.SrcAddr).String(), intToIP(event.DstAddr).String(), tcpStateToString(event.State)).Inc()
    if isAnomalous(event) {
        networkAnomaliesTotal.WithLabelValues("connection_anomaly", "warning").Inc()
    }
}

2. Grafana Dashboard Configuration

{
  "dashboard": {
    "title": "eBPF Network Monitoring",
    "panels": [
      {
        "title": "TCP Connection States",
        "type": "stat",
        "targets": [{"expr": "rate(tcp_connections_total[5m])", "legendFormat": "{{state}}"}]
      },
      {
        "title": "Network Anomalies",
        "type": "graph",
        "targets": [{"expr": "increase(network_anomalies_total[1h])", "legendFormat": "{{type}}"}]
      }
    ]
  }
}

Real‑World Effects and Cases

In production deployments the system detected several types of network anomalies:

DNS resolution issues : Frequent slow DNS queries from a pod.

Connection‑pool exhaustion : Abnormal growth of connections between microservices.

Network partition : Immediate alerts when a node loses network connectivity.

Compared with traditional monitoring, the solution offers:

Zero intrusion : No changes to application code or configuration.

Real‑time : Kernel‑level monitoring with minimal latency.

Comprehensiveness : Covers all L3/L4 network events.

Low cost : Fully open‑source, no license fees.

Conclusion and Outlook

By leveraging eBPF, we built a powerful Kubernetes network anomaly detection system that overcomes the shortcomings of traditional monitoring and provides unprecedented observability.

Next steps include integrating machine‑learning algorithms to improve detection accuracy, adding support for more protocols such as HTTP/2 and gRPC, and developing automatic remediation capabilities for true self‑healing.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

KubernetesGoeBPFGrafana
MaGe Linux Operations
Written by

MaGe Linux Operations

Founded in 2009, MaGe Education is a top Chinese high‑end IT training brand. Its graduates earn 12K+ RMB salaries, and the school has trained tens of thousands of students. It offers high‑pay courses in Linux cloud operations, Python full‑stack, automation, data analysis, AI, and Go high‑concurrency architecture. Thanks to quality courses and a solid reputation, it has talent partnerships with numerous internet firms.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.