Operations 16 min read

How We Built a Real‑Time Log Collection System with Go, Kafka, and Etcd

This article examines the shortcomings of a legacy log‑collection setup, introduces a streamlined real‑time architecture that eliminates per‑machine Hadoop clients, centralizes configuration via Etcd with a web UI, and details the Go implementation—including configuration design, Etcd watching, Kafka integration, tail management, and per‑second rate limiting—complete with code snippets and performance graphs.

21CTO
21CTO
21CTO
How We Built a Real‑Time Log Collection System with Go, Kafka, and Etcd

Log collection becomes a standard requirement for companies at scale, and a stable, low‑maintenance system benefits both operations engineers and log consumers. The original system suffered from multiple agents per host, separate Hadoop clients, hourly or five‑minute batch uploads, and severe load spikes at the top of each hour.

1. Problems with the Old System

Each machine ran a log‑collection agent and an upload agent, requiring a Hadoop client to be mounted. The agents sliced logs and, at scheduled times, packaged one hour or five minutes of logs for upload to the Hadoop cluster. Real‑time collection used another agent that streamed logs to Kafka. Issues included:

No unified configuration entry; configuration had to be applied manually on thousands of machines.

Multiple Hadoop clients per host increased operational complexity.

No rate limiting caused massive pressure at the hour mark, as shown by the following metrics.

CPU usage:

CPU usage graph
CPU usage graph

Load:

Load graph
Load graph

Network card usage:

Network usage graph
Network usage graph

2. New System Architecture

The redesign simplifies the pipeline to a single real‑time collection mode, eliminating the need for per‑machine Hadoop clients. Configuration is centralized in Etcd and managed through a friendly web interface, allowing users to specify a service and its log files once for all machines. Additional improvements include:

Unified configuration management with web UI.

Traffic shaping to prevent spikes even with real‑time collection.

Support for log replay.

The overall architecture consists of three Go components:

logagent : watches Etcd for configuration changes, tails log files, applies per‑second rate limiting, and sends log lines to Kafka.

web : provides APIs to update Etcd configuration, intended to be integrated into a CMDB or resource‑management system.

transfer : consumes Kafka messages and forwards them to Elasticsearch, Hadoop, or Storm.

Source code is available at https://github.com/zingp/logagent .

Architecture diagram:

Log system architecture
Log system architecture

3. Implementation Details

3.1 Configuration Design

The main configuration file (INI format) defines Etcd and Kafka addresses, timeouts, thread counts, and log file locations. Example:

etcd_addr = 10.134.123.183:2379   # Etcd address
etcd_timeout = 5                # Connection timeout (seconds)
etcd_watch_key = /logagent/%s/logconfig

kafka_addr = 10.134.123.183:9092 # Kafka address

thread_num = 4                  # Number of worker threads
log = ./log/logagent.log        # Agent log file
level = debug                   # Log level

# Log monitoring configuration (stored in Etcd)
# Each entry specifies service name, log path, Kafka topic, and send rate limit.

Etcd stores a JSON array of objects, for example:

[
  {
    "service": "test_service",
    "log_path": "/search/nginx/logs/ping-android.shouji.sogou.com_access_log",
    "topic": "nginx_log",
    "send_rate": 1000
  },
  {
    "service": "srv.android.shouji.sogou.com",
    "log_path": "/search/nginx/logs/srv.android.shouji.sogou.com_access_log",
    "topic": "nginx_log",
    "send_rate": 2000
  }
]

3.2 Etcd Interaction

Two functions handle Etcd: initEtcd connects to Etcd, pulls the initial configuration for each host (key format /logagent/<IP>/logconfig), and starts a watcher goroutine. etcdWatch continuously monitors the keys and pushes updated JSON strings onto a channel for the tail manager to reload.

func initEtcd(addr []string, keyFormat string, timeout time.Duration) (err error) {
    // Connect to Etcd, fetch initial configs, start watcher
}

func etcdWatch(keys []string) {
    // Watch each key and forward changes via confChan
}

3.3 IP Discovery

Because a host may have multiple IPs, getLocalIP enumerates network interfaces and returns all global unicast addresses.

func getLocalIP() (ips []string, err error) {
    // Iterate over interfaces, collect IPv4/IPv6 global addresses
}

3.4 Kafka Initialization

A synchronous Kafka producer is created with a configurable number of worker goroutines that read from a buffered channel and send messages to the appropriate topic.

type Message struct {
    line  string
    topic string
}

type KafkaSend struct {
    client   sarama.SyncProducer
    lineChan chan *Message
}

func initKafka(kafkaAddr string, threadNum int) error {
    // Create producer, start worker goroutines
}

3.5 Tail Management and Rate Limiting

The TailMgr maintains a map of TailObj instances, each responsible for tailing a specific log file, applying a per‑second send‑rate limit, and forwarding lines to Kafka.

type TailObj struct {
    tail      *tail.Tail
    offset    int64
    logConf   LogConfig
    secLimit  *SecondLimit
    exitChan  chan bool
}

func (t *TailObj) readLog() {
    for line := range t.tail.Lines {
        // Trim, filter, send to Kafka, enforce rate limit
    }
    waitGroup.Done()
}

The SecondLimit struct tracks the number of messages sent in the current second and sleeps when the configured limit is reached.

type SecondLimit struct {
    unixSecond int64
    curCount   int32
    limit      int32
}

func (s *SecondLimit) Add(count int) { /* atomic increment */ }
func (s *SecondLimit) Wait() bool { /* sleep until next second if limit exceeded */ }

Conclusion

The new Go‑based log collection system resolves the major pain points of the legacy solution: it reduces per‑machine dependencies, centralizes configuration, provides real‑time ingestion with traffic shaping, and offers a clear path for log replay. The complete source code is hosted on GitHub for further exploration.

Source: http://www.cnblogs.com/zingp/p/9365010.html

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

Real-TimeSystem Architecturelog collectionetcd
21CTO
Written by

21CTO

21CTO (21CTO.com) offers developers community, training, and services, making it your go‑to learning and service platform.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.