Design and Implementation of a Real‑Time Log Collection System Using Go, Etcd, and Kafka

This article describes the shortcomings of a legacy log‑collection architecture, proposes a streamlined real‑time design that centralises configuration in Etcd, uses a single Go‑based logagent to tail files, applies per‑service rate limiting, and forwards logs to Kafka for downstream processing.

Architecture Digest
Architecture Digest
Architecture Digest
Design and Implementation of a Real‑Time Log Collection System Using Go, Etcd, and Kafka

The author begins by criticizing the existing log‑collection system used in their company, which required multiple agents per host, separate Hadoop clients, and different collection frequencies (hourly, every five minutes, and real‑time), leading to high operational overhead, configuration sprawl, and severe load spikes at the top of each hour.

To address these issues, a new architecture is introduced: a single real‑time collection mode, removal of Hadoop client deployment from each host, a unified configuration service backed by Etcd with a web UI, traffic shaping via per‑service rate limits, and a log‑replay mechanism. The overall flow is illustrated with an architecture diagram (image omitted for brevity).

Configuration Design

The logagent reads its settings from an INI file and from Etcd. Example configuration file content:

etcd_addr = 10.134.123.183:2379          # etcd address
etcd_timeout = 5                       # connection timeout
etcd_watch_key = /logagent/%s/logconfig  # key pattern

kafka_addr = 10.134.123.183:9092        # kafka address

thread_num = 4                          # number of worker threads
log = ./log/logagent.log                # agent log file
level = debug                           # log level

# The list of services to watch, their log paths, topics and rate limits can be stored in Etcd as JSON, e.g.:
[
  {"service":"test_service","log_path":"/search/nginx/logs/ping-android.shouji.sogou.com_access_log","topic":"nginx_log","send_rate":1000},
  {"service":"srv.android.shouji.sogou.com","log_path":"/search/nginx/logs/srv.android.shouji.sogou.com_access_log","topic":"nginx_log","send_rate":2000}
]

The Go code that parses the INI file is shown below:

package main

import (
    "fmt"
    "github.com/astaxie/beego/config"
)

type AppConfig struct {
    EtcdAddr      string
    EtcdTimeOut   int
    EtcdWatchKey  string
    KafkaAddr     string
    ThreadNum     int
    LogFile       string
    LogLevel      string
}

var appConf = &AppConfig{}

func initConfig(file string) (err error) {
    conf, err := config.NewConfig("ini", file)
    if err != nil {
        fmt.Println("new config failed, err:", err)
        return
    }
    appConf.EtcdAddr = conf.String("etcd_addr")
    appConf.EtcdTimeOut = conf.DefaultInt("etcd_timeout", 5)
    appConf.EtcdWatchKey = conf.String("etcd_watch_key")
    appConf.KafkaAddr = conf.String("kafka_addr")
    appConf.ThreadNum = conf.DefaultInt("thread_num", 4)
    appConf.LogFile = conf.String("log")
    appConf.LogLevel = conf.String("level")
    return
}

Etcd integration consists of an initial pull of configuration for the local host and a long‑running watcher that pushes any changes onto a channel for the logagent to consume:

package main

import (
    "context"
    "fmt"
    "sync"
    "time"
    "github.com/astaxie/beego/logs"
    client "github.com/coreos/etcd/clientv3"
)

var (
    confChan = make(chan string, 10)
    cli      *client.Client
    waitGroup sync.WaitGroup
)

func initEtcd(addr []string, keyFormat string, timeout time.Duration) (err error) {
    cli, err = client.New(client.Config{Endpoints: addr, DialTimeout: timeout})
    if err != nil {
        fmt.Println("connect etcd error:", err)
        return
    }
    logs.Debug("init etcd success")
    // pull existing keys
    // ... omitted for brevity ...
    waitGroup.Add(1)
    go etcdWatch([]string{keyFormat})
    return
}

func etcdWatch(keys []string) {
    defer waitGroup.Done()
    var watchChans []client.WatchChan
    for _, key := range keys {
        watchChans = append(watchChans, cli.Watch(context.Background(), key))
    }
    for {
        for _, watchC := range watchChans {
            select {
            case wresp := <-watchC:
                for _, ev := range wresp.Events {
                    confChan <- string(ev.Kv.Value)
                    logs.Debug("etcd key = %s, etcd value = %s", ev.Kv.Key, ev.Kv.Value)
                }
            default:
            }
        }
        time.Sleep(time.Second)
    }
}

The logagent uses a TailMgr to manage multiple TailObj instances, each responsible for a single log file. The manager can add new files, reload configuration when Etcd pushes updates, and remove files that are no longer needed.

type TailObj struct {
    tail      *tail.Tail
    offset    int64
    logConf   LogConfig
    secLimit  *SecondLimit
    exitChan  chan bool
}

type TailMgr struct {
    tailObjMap map[string]*TailObj
    lock       sync.Mutex
}

func NewTailMgr() *TailMgr {
    return &TailMgr{tailObjMap: make(map[string]*TailObj, 16)}
}

func (t *TailMgr) AddLogFile(conf LogConfig) (err error) {
    t.lock.Lock()
    defer t.lock.Unlock()
    if _, ok := t.tailObjMap[conf.LogPath]; ok {
        return fmt.Errorf("duplicate filename:%s", conf.LogPath)
    }
    tail, err := tail.TailFile(conf.LogPath, tail.Config{ReOpen: true, Follow: true, Location: &tail.SeekInfo{Offset: 0, Whence: 2}, MustExist: false, Poll: true})
    if err != nil { return err }
    tailObj := &TailObj{tail: tail, offset: 0, logConf: conf, secLimit: NewSecondLimit(int32(conf.SendRate)), exitChan: make(chan bool, 1)}
    t.tailObjMap[conf.LogPath] = tailObj
    waitGroup.Add(1)
    go tailObj.readLog()
    return
}

Each TailObj reads lines from its file, applies the per‑service rate limit via a SecondLimit helper, and forwards the line to the Kafka sender:

func (t *TailObj) readLog() {
    for line := range t.tail.Lines {
        if line.Err != nil { continue }
        lineStr := strings.TrimSpace(line.Text)
        if len(lineStr) == 0 { continue }
        kafkaSend.addMessage(lineStr, t.logConf.Topic)
        t.secLimit.Add(1)
        t.secLimit.Wait()
        select {
        case <-t.exitChan:
            logs.Warn("tail obj is exited: config:", t.logConf)
            return
        default:
        }
    }
    waitGroup.Done()
}

The rate‑limiting component tracks the number of messages sent in the current second and sleeps in millisecond intervals when the configured limit is reached:

type SecondLimit struct {
    unixSecond int64
    curCount   int32
    limit      int32
}

func NewSecondLimit(limit int32) *SecondLimit { return &SecondLimit{unixSecond: time.Now().Unix(), curCount: 0, limit: limit} }

func (s *SecondLimit) Add(count int) {
    sec := time.Now().Unix()
    if sec == s.unixSecond {
        atomic.AddInt32(&s.curCount, int32(count))
        return
    }
    atomic.StoreInt64(&s.unixSecond, sec)
    atomic.StoreInt32(&s.curCount, int32(count))
}

func (s *SecondLimit) Wait() bool {
    for {
        sec := time.Now().Unix()
        if sec == atomic.LoadInt64(&s.unixSecond) && s.curCount >= s.limit {
            time.Sleep(time.Millisecond)
            continue
        }
        if sec != atomic.LoadInt64(&s.unixSecond) {
            atomic.StoreInt64(&s.unixSecond, sec)
            atomic.StoreInt32(&s.curCount, 0)
        }
        return false
    }
}

The Kafka sender creates a synchronous producer with a configurable number of worker goroutines; each worker pulls messages from a buffered channel and sends them to the appropriate topic:

type Message struct { line string; topic string }

type KafkaSend struct { client sarama.SyncProducer; lineChan chan *Message }

func NewKafkaSend(kafkaAddr string, threadNum int) (kafka *KafkaSend, err error) {
    kafka = &KafkaSend{lineChan: make(chan *Message, 10000)}
    config := sarama.NewConfig()
    config.Producer.RequiredAcks = sarama.WaitForAll
    config.Producer.Partitioner = sarama.NewRandomPartitioner
    config.Producer.Return.Successes = true
    client, err := sarama.NewSyncProducer([]string{kafkaAddr}, config)
    if err != nil { logs.Error("init kafka client err: %v", err); return }
    kafka.client = client
    for i := 0; i < threadNum; i++ {
        waitGroup.Add(1)
        go kafka.sendMsgToKfk()
    }
    return
}

func (k *KafkaSend) sendMsgToKfk() {
    defer waitGroup.Done()
    for v := range k.lineChan {
        msg := &sarama.ProducerMessage{Topic: v.topic, Value: sarama.StringEncoder(v.line)}
        if _, _, err := k.client.SendMessage(msg); err != nil {
            logs.Error("send message to kafka error: %v", err)
            return
        }
    }
}

func (k *KafkaSend) addMessage(line, topic string) error {
    k.lineChan <- &Message{line: line, topic: topic}
    return nil
}

Finally, the runServer function boots the TailMgr, starts processing Etcd updates, and blocks until all goroutines finish.

func runServer() {
    tailMgr = NewTailMgr()
    tailMgr.Process()
    waitGroup.Wait()
}

The article concludes by noting that the complete source code is available on GitHub (https://github.com/zingp/logagent) and invites readers to explore the repository for further details.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

Real-TimeGoKafkarate limitinglog collectionetcd
Architecture Digest
Written by

Architecture Digest

Focusing on Java backend development, covering application architecture from top-tier internet companies (high availability, high performance, high stability), big data, machine learning, Java architecture, and other popular fields.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.