How We Built a Real‑Time Log Collection System with Go, Kafka, and Etcd
This article examines the shortcomings of a legacy log‑collection setup, introduces a streamlined real‑time architecture that eliminates per‑machine Hadoop clients, centralizes configuration via Etcd with a web UI, and details the Go implementation—including configuration design, Etcd watching, Kafka integration, tail management, and per‑second rate limiting—complete with code snippets and performance graphs.
Log collection becomes a standard requirement for companies at scale, and a stable, low‑maintenance system benefits both operations engineers and log consumers. The original system suffered from multiple agents per host, separate Hadoop clients, hourly or five‑minute batch uploads, and severe load spikes at the top of each hour.
1. Problems with the Old System
Each machine ran a log‑collection agent and an upload agent, requiring a Hadoop client to be mounted. The agents sliced logs and, at scheduled times, packaged one hour or five minutes of logs for upload to the Hadoop cluster. Real‑time collection used another agent that streamed logs to Kafka. Issues included:
No unified configuration entry; configuration had to be applied manually on thousands of machines.
Multiple Hadoop clients per host increased operational complexity.
No rate limiting caused massive pressure at the hour mark, as shown by the following metrics.
CPU usage:
Load:
Network card usage:
2. New System Architecture
The redesign simplifies the pipeline to a single real‑time collection mode, eliminating the need for per‑machine Hadoop clients. Configuration is centralized in Etcd and managed through a friendly web interface, allowing users to specify a service and its log files once for all machines. Additional improvements include:
Unified configuration management with web UI.
Traffic shaping to prevent spikes even with real‑time collection.
Support for log replay.
The overall architecture consists of three Go components:
logagent : watches Etcd for configuration changes, tails log files, applies per‑second rate limiting, and sends log lines to Kafka.
web : provides APIs to update Etcd configuration, intended to be integrated into a CMDB or resource‑management system.
transfer : consumes Kafka messages and forwards them to Elasticsearch, Hadoop, or Storm.
Source code is available at https://github.com/zingp/logagent .
Architecture diagram:
3. Implementation Details
3.1 Configuration Design
The main configuration file (INI format) defines Etcd and Kafka addresses, timeouts, thread counts, and log file locations. Example:
etcd_addr = 10.134.123.183:2379 # Etcd address
etcd_timeout = 5 # Connection timeout (seconds)
etcd_watch_key = /logagent/%s/logconfig
kafka_addr = 10.134.123.183:9092 # Kafka address
thread_num = 4 # Number of worker threads
log = ./log/logagent.log # Agent log file
level = debug # Log level
# Log monitoring configuration (stored in Etcd)
# Each entry specifies service name, log path, Kafka topic, and send rate limit.Etcd stores a JSON array of objects, for example:
[
{
"service": "test_service",
"log_path": "/search/nginx/logs/ping-android.shouji.sogou.com_access_log",
"topic": "nginx_log",
"send_rate": 1000
},
{
"service": "srv.android.shouji.sogou.com",
"log_path": "/search/nginx/logs/srv.android.shouji.sogou.com_access_log",
"topic": "nginx_log",
"send_rate": 2000
}
]3.2 Etcd Interaction
Two functions handle Etcd: initEtcd connects to Etcd, pulls the initial configuration for each host (key format /logagent/<IP>/logconfig), and starts a watcher goroutine. etcdWatch continuously monitors the keys and pushes updated JSON strings onto a channel for the tail manager to reload.
func initEtcd(addr []string, keyFormat string, timeout time.Duration) (err error) {
// Connect to Etcd, fetch initial configs, start watcher
}
func etcdWatch(keys []string) {
// Watch each key and forward changes via confChan
}3.3 IP Discovery
Because a host may have multiple IPs, getLocalIP enumerates network interfaces and returns all global unicast addresses.
func getLocalIP() (ips []string, err error) {
// Iterate over interfaces, collect IPv4/IPv6 global addresses
}3.4 Kafka Initialization
A synchronous Kafka producer is created with a configurable number of worker goroutines that read from a buffered channel and send messages to the appropriate topic.
type Message struct {
line string
topic string
}
type KafkaSend struct {
client sarama.SyncProducer
lineChan chan *Message
}
func initKafka(kafkaAddr string, threadNum int) error {
// Create producer, start worker goroutines
}3.5 Tail Management and Rate Limiting
The TailMgr maintains a map of TailObj instances, each responsible for tailing a specific log file, applying a per‑second send‑rate limit, and forwarding lines to Kafka.
type TailObj struct {
tail *tail.Tail
offset int64
logConf LogConfig
secLimit *SecondLimit
exitChan chan bool
}
func (t *TailObj) readLog() {
for line := range t.tail.Lines {
// Trim, filter, send to Kafka, enforce rate limit
}
waitGroup.Done()
}The SecondLimit struct tracks the number of messages sent in the current second and sleeps when the configured limit is reached.
type SecondLimit struct {
unixSecond int64
curCount int32
limit int32
}
func (s *SecondLimit) Add(count int) { /* atomic increment */ }
func (s *SecondLimit) Wait() bool { /* sleep until next second if limit exceeded */ }Conclusion
The new Go‑based log collection system resolves the major pain points of the legacy solution: it reduces per‑machine dependencies, centralizes configuration, provides real‑time ingestion with traffic shaping, and offers a clear path for log replay. The complete source code is hosted on GitHub for further exploration.
Source: http://www.cnblogs.com/zingp/p/9365010.html
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
21CTO
21CTO (21CTO.com) offers developers community, training, and services, making it your go‑to learning and service platform.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
