How to Build a Go Log Collector with etcd, Context, and Kafka Integration

This article walks through redesigning a Go‑based log‑collection framework, introducing etcd for distributed configuration, demonstrating context for timeout and data propagation, and showing how to integrate Kafka consumers while improving concurrency handling and adding rate‑limiting mechanisms.

MaGe Linux Operations
MaGe Linux Operations
MaGe Linux Operations
How to Build a Go Log Collector with etcd, Context, and Kafka Integration

Log Collection System Overview

The article reorganizes the architecture of a log‑collection system, illustrating the overall logic with diagrams.

etcd Introduction

etcd is a highly available distributed key‑value store written in Go, suitable for configuration sharing and service discovery. It offers a simple RESTful API and implements strong consistency using the Raft algorithm. Similar projects include Zookeeper and Consul.

Service discovery and registration

Configuration center (used by the log‑collection client)

Distributed lock

Leader election

Installation: download the appropriate release from the etcd GitHub page, start the server, and verify with

./etcdctl set name zhaofan
zhaofan
./etcdctl get name
zhaofan

Context Overview and Usage

In Go, context provides two main functions: controlling goroutine timeouts and storing contextual data.

Example of using context.WithTimeout to limit an HTTP request:

package main

import (
    "fmt"
    "time"
    "net/http"
    "context"
    "io/ioutil"
)

type Result struct{ r *http.Response; err error }

func process(){
    ctx,cancel := context.WithTimeout(context.Background(),2*time.Second)
    defer cancel()
    tr := &http.Transport{}
    client := &http.Client{Transport:tr}
    c := make(chan Result,1)
    req,err := http.NewRequest("GET","http://www.google.com",nil)
    if err != nil { fmt.Println("http request failed,err:",err); return }
    go func(){
        resp,err := client.Do(req)
        c <- Result{resp,err}
    }()
    select {
    case <-ctx.Done():
        tr.CancelRequest(req)
        fmt.Println("timeout!")
    case res := <-c:
        defer res.r.Body.Close()
        out,_:= ioutil.ReadAll(res.r.Body)
        fmt.Printf("server response:%s",out)
    }
    return
}

func main(){ process() }

Another example stores a trace ID in the context and retrieves it in nested functions:

package main

import (
    "github.com/Go-zh/net/context"
    "fmt"
)

func add(ctx context.Context,a,b int) int {
    traceId := ctx.Value("trace_id").(string)
    fmt.Printf("trace_id:%v
",traceId)
    return a+b
}

func calc(ctx context.Context,a,b int) int {
    traceId := ctx.Value("trace_id").(string)
    fmt.Printf("trace_id:%v
",traceId)
    return add(ctx,a,b)
}

func main(){
    ctx := context.WithValue(context.Background(),"trace_id","123456")
    calc(ctx,20,30)
}

Combining etcd and Context

Connecting to etcd, storing and retrieving values, and watching for changes:

package main

import (
    etcd_client "github.com/coreos/etcd/clientv3"
    "time"
    "fmt"
    "context"
)

func main(){
    cli, err := etcd_client.New(etcd_client.Config{Endpoints:[]string{"192.168.0.118:2371"},DialTimeout:5*time.Second})
    if err != nil { fmt.Println("connect failed,err:",err); return }
    fmt.Println("connect success")
    defer cli.Close()
    ctx,cancel := context.WithTimeout(context.Background(),time.Second)
    _,err = cli.Put(ctx,"logagent/conf/","sample_value")
    cancel()
    if err != nil { fmt.Println("put failed,err",err); return }
    ctx, cancel = context.WithTimeout(context.Background(),time.Second)
    resp,err := cli.Get(ctx,"logagent/conf/")
    cancel()
    if err != nil { fmt.Println("get failed,err:",err); return }
    for _,ev := range resp.Kvs{ fmt.Printf("%s:%s
",ev.Key,ev.Value) }
}

Watching for configuration changes:

package main

import (
    "github.com/coreos/etcd/clientv3"
    "time"
    "fmt"
    "context"
)

func main(){
    cli,err := clientv3.New(clientv3.Config{Endpoints:[]string{"192.168.0.118:2371"},DialTimeout:5*time.Second})
    if err != nil { fmt.Println("connect failed,err:",err); return }
    defer cli.Close()
    rch := cli.Watch(context.Background(),"logagent/conf/")
    for wresp := range rch{
        for _,ev := range wresp.Events{
            fmt.Printf("%s %q : %q
", ev.Type, ev.Kv.Key, ev.Kv.Value)
        }
    }
}

Kafka Consumer Example

Basic consumer using Sarama, initially with time.Sleep to keep goroutines alive:

package main

import (
    "github.com/Shopify/sarama"
    "strings"
    "fmt"
    "time"
)

func main(){
    consumer,err := sarama.NewConsumer(strings.Split("192.168.0.118:9092",","),nil)
    if err != nil { fmt.Println("failed to start consumer:",err); return }
    partitionList,err := consumer.Partitions("nginx_log")
    if err != nil { fmt.Println("Failed to get the list of partitions:",err); return }
    fmt.Println(partitionList)
    for _,partition := range partitionList{
        pc,err := consumer.ConsumePartition("nginx_log",int32(partition),sarama.OffsetNewest)
        if err != nil { fmt.Printf("failed to start consumer for partition %d:%s
",partition,err); return }
        defer pc.AsyncClose()
        go func(partitionConsumer sarama.PartitionConsumer){
            for msg := range pc.Messages(){
                fmt.Printf("partition:%d Offset:%d Key:%s Value:%s",msg.Partition,msg.Offset,string(msg.Key),string(msg.Value))
            }
        }(pc)
    }
    time.Sleep(time.Hour)
    consumer.Close()
}

Improved version replaces time.Sleep with a sync.WaitGroup to wait for all goroutines:

package main

import (
    "github.com/Shopify/sarama"
    "strings"
    "fmt"
    "sync"
)

var wg sync.WaitGroup

func main(){
    consumer,err := sarama.NewConsumer(strings.Split("192.168.0.118:9092",","),nil)
    if err != nil { fmt.Println("failed to start consumer:",err); return }
    partitionList,err := consumer.Partitions("nginx_log")
    if err != nil { fmt.Println("Failed to get the list of partitions:",err); return }
    fmt.Println(partitionList)
    for _,partition := range partitionList{
        pc,err := consumer.ConsumePartition("nginx_log",int32(partition),sarama.OffsetNewest)
        if err != nil { fmt.Printf("failed to start consumer for partition %d:%s
",partition,err); return }
        defer pc.AsyncClose()
        go func(partitionConsumer sarama.PartitionConsumer){
            wg.Add(1)
            for msg := range partitionConsumer.Messages(){
                fmt.Printf("partition:%d Offset:%d Key:%s Value:%s",msg.Partition,msg.Offset,string(msg.Key),string(msg.Value))
            }
            wg.Done()
        }(pc)
    }
    wg.Wait()
    consumer.Close()
}

Storing Log Configuration in etcd

Functions to initialize etcd, watch keys, and provide a channel for log configuration updates:

package main

import (
    "github.com/coreos/etcd/clientv3"
    "time"
    "github.com/astaxie/beego/logs"
    "context"
    "fmt"
)

var Client *clientv3.Client
var logConfChan chan string

func initEtcd(addr []string,keyfmt string,timeout time.Duration) (err error){
    var keys []string
    for _,ip := range ipArrays{
        keys = append(keys,fmt.Sprintf(keyfmt,ip))
    }
    logConfChan = make(chan string,10)
    logs.Debug("etcd watch key:%v timeout:%v", keys, timeout)
    Client,err = clientv3.New(clientv3.Config{Endpoints:addr, DialTimeout:timeout})
    if err != nil { logs.Error("connect failed,err:%v",err); return }
    logs.Debug("init etcd success")
    waitGroup.Add(1)
    for _, key := range keys{
        ctx,cancel := context.WithTimeout(context.Background(),2*time.Second)
        resp,err := Client.Get(ctx,key)
        cancel()
        if err != nil { logs.Warn("get key %s failed,err:%v",key,err); continue }
        for _, ev := range resp.Kvs{ logs.Debug("%q : %q
", ev.Key, ev.Value); logConfChan <- string(ev.Value) }
    }
    go WatchEtcd(keys)
    return
}

func WatchEtcd(keys []string){
    var watchChans []clientv3.WatchChan
    for _,key := range keys{ rch := Client.Watch(context.Background(),key); watchChans = append(watchChans,rch) }
    for {
        for _,watchC := range watchChans{
            select{
            case wresp := <-watchC:
                for _,ev := range wresp.Events{ logs.Debug("%s %q : %q
", ev.Type, ev.Kv.Key, ev.Kv.Value); logConfChan <- string(ev.Kv.Value) }
            default:
            }
        }
        time.Sleep(time.Second)
    }
    waitGroup.Done()
}

func GetLogConf() chan string { return logConfChan }

Rate‑Limiting Helper

A simple per‑second limiter to avoid overloading the business system:

package main

import (
    "time"
    "sync/atomic"
    "github.com/astaxie/beego/logs"
)

type SecondLimit struct{ unixSecond int64; curCount int32; limit int32 }

func NewSecondLimit(limit int32) *SecondLimit { return &SecondLimit{unixSecond:time.Now().Unix(), curCount:0, limit:limit} }

func (s *SecondLimit) Add(count int){
    sec := time.Now().Unix()
    if sec == s.unixSecond { atomic.AddInt32(&s.curCount,int32(count)); return }
    atomic.StoreInt64(&s.unixSecond,sec)
    atomic.StoreInt32(&s.curCount,int32(count))
}

func (s *SecondLimit) Wait() bool {
    for {
        sec := time.Now().Unix()
        if sec == atomic.LoadInt64(&s.unixSecond) && s.curCount == s.limit {
            time.Sleep(time.Microsecond)
            logs.Debug("limit is running,limit:%d s.curCount:%d",s.limit,s.curCount)
            continue
        }
        if sec != atomic.LoadInt64(&s.unixSecond) { atomic.StoreInt64(&s.unixSecond,sec); atomic.StoreInt32(&s.curCount,0) }
        logs.Debug("limit is exited")
        return false
    }
}

Conclusion

The first half of the log‑collection pipeline—framework design, etcd configuration management, context handling, Kafka consumption, and rate limiting—has been implemented. The next steps will push logs to Elasticsearch and render them on a web interface.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

BackendKafkarate limitinglog collectioncontext
MaGe Linux Operations
Written by

MaGe Linux Operations

Founded in 2009, MaGe Education is a top Chinese high‑end IT training brand. Its graduates earn 12K+ RMB salaries, and the school has trained tens of thousands of students. It offers high‑pay courses in Linux cloud operations, Python full‑stack, automation, data analysis, AI, and Go high‑concurrency architecture. Thanks to quality courses and a solid reputation, it has talent partnerships with numerous internet firms.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.