Design and Implementation of a Real‑Time Log Collection System Using Go, Etcd, and Kafka
This article describes the shortcomings of a legacy log‑collection architecture, proposes a streamlined real‑time design that centralises configuration in Etcd, uses a single Go‑based logagent to tail files, applies per‑service rate limiting, and forwards logs to Kafka for downstream processing.
The author begins by criticizing the existing log‑collection system used in their company, which required multiple agents per host, separate Hadoop clients, and different collection frequencies (hourly, every five minutes, and real‑time), leading to high operational overhead, configuration sprawl, and severe load spikes at the top of each hour.
To address these issues, a new architecture is introduced: a single real‑time collection mode, removal of Hadoop client deployment from each host, a unified configuration service backed by Etcd with a web UI, traffic shaping via per‑service rate limits, and a log‑replay mechanism. The overall flow is illustrated with an architecture diagram (image omitted for brevity).
Configuration Design
The logagent reads its settings from an INI file and from Etcd. Example configuration file content:
etcd_addr = 10.134.123.183:2379 # etcd address
etcd_timeout = 5 # connection timeout
etcd_watch_key = /logagent/%s/logconfig # key pattern
kafka_addr = 10.134.123.183:9092 # kafka address
thread_num = 4 # number of worker threads
log = ./log/logagent.log # agent log file
level = debug # log level
# The list of services to watch, their log paths, topics and rate limits can be stored in Etcd as JSON, e.g.:
[
{"service":"test_service","log_path":"/search/nginx/logs/ping-android.shouji.sogou.com_access_log","topic":"nginx_log","send_rate":1000},
{"service":"srv.android.shouji.sogou.com","log_path":"/search/nginx/logs/srv.android.shouji.sogou.com_access_log","topic":"nginx_log","send_rate":2000}
]The Go code that parses the INI file is shown below:
package main
import (
"fmt"
"github.com/astaxie/beego/config"
)
type AppConfig struct {
EtcdAddr string
EtcdTimeOut int
EtcdWatchKey string
KafkaAddr string
ThreadNum int
LogFile string
LogLevel string
}
var appConf = &AppConfig{}
func initConfig(file string) (err error) {
conf, err := config.NewConfig("ini", file)
if err != nil {
fmt.Println("new config failed, err:", err)
return
}
appConf.EtcdAddr = conf.String("etcd_addr")
appConf.EtcdTimeOut = conf.DefaultInt("etcd_timeout", 5)
appConf.EtcdWatchKey = conf.String("etcd_watch_key")
appConf.KafkaAddr = conf.String("kafka_addr")
appConf.ThreadNum = conf.DefaultInt("thread_num", 4)
appConf.LogFile = conf.String("log")
appConf.LogLevel = conf.String("level")
return
}Etcd integration consists of an initial pull of configuration for the local host and a long‑running watcher that pushes any changes onto a channel for the logagent to consume:
package main
import (
"context"
"fmt"
"sync"
"time"
"github.com/astaxie/beego/logs"
client "github.com/coreos/etcd/clientv3"
)
var (
confChan = make(chan string, 10)
cli *client.Client
waitGroup sync.WaitGroup
)
func initEtcd(addr []string, keyFormat string, timeout time.Duration) (err error) {
cli, err = client.New(client.Config{Endpoints: addr, DialTimeout: timeout})
if err != nil {
fmt.Println("connect etcd error:", err)
return
}
logs.Debug("init etcd success")
// pull existing keys
// ... omitted for brevity ...
waitGroup.Add(1)
go etcdWatch([]string{keyFormat})
return
}
func etcdWatch(keys []string) {
defer waitGroup.Done()
var watchChans []client.WatchChan
for _, key := range keys {
watchChans = append(watchChans, cli.Watch(context.Background(), key))
}
for {
for _, watchC := range watchChans {
select {
case wresp := <-watchC:
for _, ev := range wresp.Events {
confChan <- string(ev.Kv.Value)
logs.Debug("etcd key = %s, etcd value = %s", ev.Kv.Key, ev.Kv.Value)
}
default:
}
}
time.Sleep(time.Second)
}
}The logagent uses a TailMgr to manage multiple TailObj instances, each responsible for a single log file. The manager can add new files, reload configuration when Etcd pushes updates, and remove files that are no longer needed.
type TailObj struct {
tail *tail.Tail
offset int64
logConf LogConfig
secLimit *SecondLimit
exitChan chan bool
}
type TailMgr struct {
tailObjMap map[string]*TailObj
lock sync.Mutex
}
func NewTailMgr() *TailMgr {
return &TailMgr{tailObjMap: make(map[string]*TailObj, 16)}
}
func (t *TailMgr) AddLogFile(conf LogConfig) (err error) {
t.lock.Lock()
defer t.lock.Unlock()
if _, ok := t.tailObjMap[conf.LogPath]; ok {
return fmt.Errorf("duplicate filename:%s", conf.LogPath)
}
tail, err := tail.TailFile(conf.LogPath, tail.Config{ReOpen: true, Follow: true, Location: &tail.SeekInfo{Offset: 0, Whence: 2}, MustExist: false, Poll: true})
if err != nil { return err }
tailObj := &TailObj{tail: tail, offset: 0, logConf: conf, secLimit: NewSecondLimit(int32(conf.SendRate)), exitChan: make(chan bool, 1)}
t.tailObjMap[conf.LogPath] = tailObj
waitGroup.Add(1)
go tailObj.readLog()
return
}Each TailObj reads lines from its file, applies the per‑service rate limit via a SecondLimit helper, and forwards the line to the Kafka sender:
func (t *TailObj) readLog() {
for line := range t.tail.Lines {
if line.Err != nil { continue }
lineStr := strings.TrimSpace(line.Text)
if len(lineStr) == 0 { continue }
kafkaSend.addMessage(lineStr, t.logConf.Topic)
t.secLimit.Add(1)
t.secLimit.Wait()
select {
case <-t.exitChan:
logs.Warn("tail obj is exited: config:", t.logConf)
return
default:
}
}
waitGroup.Done()
}The rate‑limiting component tracks the number of messages sent in the current second and sleeps in millisecond intervals when the configured limit is reached:
type SecondLimit struct {
unixSecond int64
curCount int32
limit int32
}
func NewSecondLimit(limit int32) *SecondLimit { return &SecondLimit{unixSecond: time.Now().Unix(), curCount: 0, limit: limit} }
func (s *SecondLimit) Add(count int) {
sec := time.Now().Unix()
if sec == s.unixSecond {
atomic.AddInt32(&s.curCount, int32(count))
return
}
atomic.StoreInt64(&s.unixSecond, sec)
atomic.StoreInt32(&s.curCount, int32(count))
}
func (s *SecondLimit) Wait() bool {
for {
sec := time.Now().Unix()
if sec == atomic.LoadInt64(&s.unixSecond) && s.curCount >= s.limit {
time.Sleep(time.Millisecond)
continue
}
if sec != atomic.LoadInt64(&s.unixSecond) {
atomic.StoreInt64(&s.unixSecond, sec)
atomic.StoreInt32(&s.curCount, 0)
}
return false
}
}The Kafka sender creates a synchronous producer with a configurable number of worker goroutines; each worker pulls messages from a buffered channel and sends them to the appropriate topic:
type Message struct { line string; topic string }
type KafkaSend struct { client sarama.SyncProducer; lineChan chan *Message }
func NewKafkaSend(kafkaAddr string, threadNum int) (kafka *KafkaSend, err error) {
kafka = &KafkaSend{lineChan: make(chan *Message, 10000)}
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Partitioner = sarama.NewRandomPartitioner
config.Producer.Return.Successes = true
client, err := sarama.NewSyncProducer([]string{kafkaAddr}, config)
if err != nil { logs.Error("init kafka client err: %v", err); return }
kafka.client = client
for i := 0; i < threadNum; i++ {
waitGroup.Add(1)
go kafka.sendMsgToKfk()
}
return
}
func (k *KafkaSend) sendMsgToKfk() {
defer waitGroup.Done()
for v := range k.lineChan {
msg := &sarama.ProducerMessage{Topic: v.topic, Value: sarama.StringEncoder(v.line)}
if _, _, err := k.client.SendMessage(msg); err != nil {
logs.Error("send message to kafka error: %v", err)
return
}
}
}
func (k *KafkaSend) addMessage(line, topic string) error {
k.lineChan <- &Message{line: line, topic: topic}
return nil
}Finally, the runServer function boots the TailMgr, starts processing Etcd updates, and blocks until all goroutines finish.
func runServer() {
tailMgr = NewTailMgr()
tailMgr.Process()
waitGroup.Wait()
}The article concludes by noting that the complete source code is available on GitHub (https://github.com/zingp/logagent) and invites readers to explore the repository for further details.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Architecture Digest
Focusing on Java backend development, covering application architecture from top-tier internet companies (high availability, high performance, high stability), big data, machine learning, Java architecture, and other popular fields.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
