How to Build a Go‑Based Log Collection System with etcd, Context, and Kafka
This article walks through designing and implementing a Go log‑collection agent that uses etcd for configuration storage, context for timeout and metadata handling, and Kafka for message consumption, complete with code examples, setup instructions, and a rate‑limiting utility.
System Overview
The author reorganized the log‑collection system architecture (see diagram below) and outlines the overall logic of the implementation, with the full source code available at GitHub .
etcd Introduction
etcd is a highly‑available distributed key‑value store written in Go, offering a simple RESTful API and strong consistency via the Raft algorithm. It is comparable to Zookeeper and Consul and is commonly used for service discovery, configuration sharing, distributed locks, and leader election.
Service discovery and registration
Configuration center (used by the log‑collection client)
Distributed lock
Leader election
Installation is straightforward: download the appropriate release from the official site, extract, and start the binary.
Verification example:
[root@localhost etcd-v3.2.18-linux-amd64]# ./etcdctl set name zhaofan
zhaofan
[root@localhost etcd-v3.2.18-linux-amd64]# ./etcdctl get name
zhaofanContext Overview
In Go, context provides two main functions: controlling goroutine timeouts and carrying request‑scoped values.
Simple demonstration:
package main
import (
"fmt"
"time"
"net/http"
"context"
"io/ioutil"
)
type Result struct { r *http.Response; err error }
func process() {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
tr := &http.Transport{}
client := &http.Client{Transport: tr}
c := make(chan Result, 1)
req, err := http.NewRequest("GET", "http://www.google.com", nil)
if err != nil { fmt.Println("http request failed, err:", err); return }
go func() {
resp, err := client.Do(req)
c <- Result{resp, err}
}()
select {
case <-ctx.Done():
tr.CancelRequest(req)
fmt.Println("timeout!")
case res := <-c:
defer res.r.Body.Close()
out, _ := ioutil.ReadAll(res.r.Body)
fmt.Printf("server response:%s", out)
}
return
}
func main() { process() }Another example shows storing a trace ID in the context and retrieving it in downstream functions.
package main
import (
"github.com/Go-zh/net/context"
"fmt"
)
func add(ctx context.Context, a, b int) int {
traceId := ctx.Value("trace_id").(string)
fmt.Printf("trace_id:%v
", traceId)
return a + b
}
func calc(ctx context.Context, a, b int) int {
traceId := ctx.Value("trace_id").(string)
fmt.Printf("trace_id:%v
", traceId)
return add(ctx, a, b)
}
func main() {
ctx := context.WithValue(context.Background(), "trace_id", "123456")
calc(ctx, 20, 30)
}Combining etcd and Context
Connecting to etcd, storing a configuration key, and watching for changes:
package main
import (
etcd_client "github.com/coreos/etcd/clientv3"
"time"
"fmt"
)
func main() {
cli, err := etcd_client.New(etcd_client.Config{Endpoints: []string{"192.168.0.118:2371"}, DialTimeout: 5*time.Second})
if err != nil { fmt.Println("connect failed, err:", err); return }
fmt.Println("connect success")
defer cli.Close()
}Put and get a configuration value:
package main
import (
"github.com/coreos/etcd/clientv3"
"time"
"fmt"
"context"
)
func main() {
cli, err := clientv3.New(clientv3.Config{Endpoints: []string{"192.168.0.118:2371"}, DialTimeout: 5*time.Second})
if err != nil { fmt.Println("connect failed, err:", err); return }
fmt.Println("connect succ")
defer cli.Close()
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
_, err = cli.Put(ctx, "logagent/conf/", "sample_value")
cancel()
if err != nil { fmt.Println("put failed, err", err); return }
ctx, cancel = context.WithTimeout(context.Background(), time.Second)
resp, err := cli.Get(ctx, "logagent/conf/")
cancel()
if err != nil { fmt.Println("get failed, err:", err); return }
for _, ev := range resp.Kvs { fmt.Printf("%s:%s
", ev.Key, ev.Value) }
}Watching for configuration changes:
package main
import (
"github.com/coreos/etcd/clientv3"
"time"
"fmt"
"context"
)
func main() {
cli, err := clientv3.New(clientv3.Config{Endpoints: []string{"192.168.0.118:2371"}, DialTimeout: 5*time.Second})
if err != nil { fmt.Println("connect failed, err:", err); return }
defer cli.Close()
rch := cli.Watch(context.Background(), "logagent/conf/")
for wresp := range rch {
for _, ev := range wresp.Events {
fmt.Printf("%s %q : %q
", ev.Type, ev.Kv.Key, ev.Kv.Value)
}
}
}Kafka Consumer Example
Initial version uses time.Sleep to keep the program alive:
package main
import (
"github.com/Shopify/sarama"
"strings"
"fmt"
"time"
)
func main() {
consumer, err := sarama.NewConsumer(strings.Split("192.168.0.118:9092", ","), nil)
if err != nil { fmt.Println("failed to start consumer:", err); return }
partitionList, err := consumer.Partitions("nginx_log")
if err != nil { fmt.Println("Failed to get partitions:", err); return }
fmt.Println(partitionList)
for _, partition := range partitionList {
pc, err := consumer.ConsumePartition("nginx_log", int32(partition), sarama.OffsetNewest)
if err != nil { fmt.Printf("failed to start consumer for partition %d:%s
", partition, err); return }
defer pc.AsyncClose()
go func(pc sarama.PartitionConsumer) {
for msg := range pc.Messages() {
fmt.Printf("partition:%d Offset:%d Key:%s Value:%s", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
}
}(pc)
}
time.Sleep(time.Hour)
consumer.Close()
}Improved version replaces time.Sleep with a sync.WaitGroup to wait for all goroutines:
package main
import (
"github.com/Shopify/sarama"
"strings"
"fmt"
"sync"
)
var wg sync.WaitGroup
func main() {
consumer, err := sarama.NewConsumer(strings.Split("192.168.0.118:9092", ","), nil)
if err != nil { fmt.Println("failed to start consumer:", err); return }
partitionList, err := consumer.Partitions("nginx_log")
if err != nil { fmt.Println("Failed to get partitions:", err); return }
fmt.Println(partitionList)
for _, partition := range partitionList {
pc, err := consumer.ConsumePartition("nginx_log", int32(partition), sarama.OffsetNewest)
if err != nil { fmt.Printf("failed to start consumer for partition %d:%s
", partition, err); return }
defer pc.AsyncClose()
go func(pc sarama.PartitionConsumer) {
wg.Add(1)
for msg := range pc.Messages() {
fmt.Printf("partition:%d Offset:%d Key:%s Value:%s", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
}
wg.Done()
}(pc)
}
wg.Wait()
consumer.Close()
}Storing Log Configuration in etcd
Initialization, watching, and retrieval functions are provided to load log‑collection settings from etcd and react to updates:
package main
import (
"github.com/coreos/etcd/clientv3"
"time"
"github.com/astaxie/beego/logs"
"context"
"fmt"
)
var Client *clientv3.Client
var logConfChan chan string
func initEtcd(addr []string, keyfmt string, timeout time.Duration) (err error) {
var keys []string
for _, ip := range ipArrays {
keys = append(keys, fmt.Sprintf(keyfmt, ip))
}
logConfChan = make(chan string, 10)
logs.Debug("etcd watch key:%v timeout:%v", keys, timeout)
Client, err = clientv3.New(clientv3.Config{Endpoints: addr, DialTimeout: timeout})
if err != nil { logs.Error("connect failed, err:%v", err); return }
logs.Debug("init etcd success")
waitGroup.Add(1)
for _, key := range keys {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
resp, err := Client.Get(ctx, key)
cancel()
if err != nil { logs.Warn("get key %s failed, err:%v", key, err); continue }
for _, ev := range resp.Kvs {
logs.Debug("%q : %q
", ev.Key, ev.Value)
logConfChan <- string(ev.Value)
}
}
go WatchEtcd(keys)
return
}
func WatchEtcd(keys []string) {
var watchChans []clientv3.WatchChan
for _, key := range keys {
rch := Client.Watch(context.Background(), key)
watchChans = append(watchChans, rch)
}
for {
for _, watchC := range watchChans {
select {
case wresp := <-watchC:
for _, ev := range wresp.Events {
logs.Debug("%s %q : %q
", ev.Type, ev.Kv.Key, ev.Kv.Value)
logConfChan <- string(ev.Kv.Value)
}
default:
}
}
time.Sleep(time.Second)
}
waitGroup.Done()
}
func GetLogConf() chan string { return logConfChan }Rate‑Limiting Utility
A simple per‑second limiter prevents the log collector from overwhelming the business workload:
package main
import (
"time"
"sync/atomic"
"github.com/astaxie/beego/logs"
)
type SecondLimit struct { unixSecond int64; curCount int32; limit int32 }
func NewSecondLimit(limit int32) *SecondLimit {
return &SecondLimit{unixSecond: time.Now().Unix(), curCount: 0, limit: limit}
}
func (s *SecondLimit) Add(count int) {
sec := time.Now().Unix()
if sec == s.unixSecond {
atomic.AddInt32(&s.curCount, int32(count))
return
}
atomic.StoreInt64(&s.unixSecond, sec)
atomic.StoreInt32(&s.curCount, int32(count))
}
func (s *SecondLimit) Wait() bool {
for {
sec := time.Now().Unix()
if sec == atomic.LoadInt64(&s.unixSecond) && s.curCount == s.limit {
time.Sleep(time.Microsecond)
logs.Debug("limit is running, limit:%d curCount:%d", s.limit, s.curCount)
continue
}
if sec != atomic.LoadInt64(&s.unixSecond) {
atomic.StoreInt64(&s.unixSecond, sec)
atomic.StoreInt32(&s.curCount, 0)
}
logs.Debug("limit is exited")
return false
}
}Conclusion
The first half of the log‑collection pipeline—gathering logs, storing configuration in etcd, handling context timeouts, consuming Kafka messages, and applying rate limiting—has been implemented. The next steps will push the logs into Elasticsearch and render them on a dashboard.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
21CTO
21CTO (21CTO.com) offers developers community, training, and services, making it your go‑to learning and service platform.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
