Understanding Kafka Consumer Groups, Polling, and Offset Commit Strategies with kafka-go
The article demonstrates how kafka-go implements consumer groups by creating readers that spawn poll goroutines per partition, explains heartbeat‑driven rebalancing, and compares automatic versus manual offset‑commit strategies, highlighting their impact on throughput, reliability, and message‑processing semantics.
To facilitate testing, a topic named test-topic with 4 partitions and 7 messages is created using the kowl GUI or the Kafka CLI.
The basic consumer code using kafka.NewReader reads messages from the topic and prints their key/value along with partition and offset information.
func main() {
r := kafka.NewReader(kafka.ReaderConfig{
GroupTopics: []string{"test-topic"},
Brokers: []string{"localhost:9092"},
GroupID: "test-consumer-group",
SessionTimeout: time.Second * 6,
})
ctx := context.Background()
for {
m, err := r.FetchMessage(ctx)
if err != nil {
log.Fatal(err)
}
log.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))
}
}Running the program produces output similar to:
2022/06/07 15:13:19 message at topic/partition/offset test-topic/2/0: Key-C = Value-C
2022/06/07 15:13:19 message at topic/partition/offset test-topic/2/1: Key-G = Value-G
2022/06/07 15:13:19 message at topic/partition/offset test-topic/0/0: Key-A = Value-A
2022/06/07 15:13:19 message at topic/partition/offset test-topic/0/1: Key-E = Value-E
2022/06/07 15:13:19 message at topic/partition/offset test-topic/1/0: Key-B = Value-B
2022/06/07 15:13:19 message at topic/partition/offset test-topic/1/1: Key-F = Value-F
2022/06/07 15:13:19 message at topic/partition/offset test-topic/3/0: Key-D = Value-DThe consumer group test-consumer-group initially contains a single consumer, as shown in the kowl UI. In kafka-go , each kafka.Reader instance represents one consumer. Creating multiple readers creates multiple consumers:
var wg sync.WaitGroup
wg.Add(2)
r1 := kafka.NewReader(kafka.ReaderConfig{GroupTopics: []string{"test-topic"}, Brokers: []string{"localhost:9092"}, GroupID: "test-consumer-group", SessionTimeout: time.Second * 6})
r2 := kafka.NewReader(kafka.ReaderConfig{GroupTopics: []string{"test-topic"}, Brokers: []string{"localhost:9092"}, GroupID: "test-consumer-group", SessionTimeout: time.Second * 6})
ctx := context.Background()
go func() { defer wg.Done(); for { m, _ := r1.FetchMessage(ctx); /* process */ } }()
go func() { defer wg.Done(); for { m, _ := r2.FetchMessage(ctx); /* process */ } }()
wg.Wait()During consumption, kafka-go creates one poll goroutine per assigned partition (four pollWait goroutines for the four partitions). Each goroutine fetches data from its partition and pushes the messages onto a shared r.msgs channel, from which r.FetchMessage() reads.
Rebalancing is driven by the heartbeat mechanism. Each consumer runs a dedicated heartbeat goroutine that periodically sends a heartbeat request to the coordinator. When the coordinator returns a RebalanceInProgress (27) error, the heartbeat goroutine exits, all poll goroutines are stopped, and the consumer leaves the group. A new generation is then started: the consumer rejoins the group, partitions are reassigned, and fresh poll goroutines are spawned.
Offset management is crucial for reliable consumption. Without committing offsets, the consumer only tracks progress in memory; a restart or rebalance would reset the counter and cause duplicate processing. Offsets are persisted in the internal __consumer_offsets topic.
Kafka‑go supports two commit strategies:
Automatic commit : set CommitInterval in the reader config. The library starts a commitLoopInterval goroutine that batches offsets received on r.commits , selects the highest offset per partition, and commits them at the configured interval.
// Auto‑commit example (FetchMessage + CommitMessages)
r := kafka.NewReader(kafka.ReaderConfig{GroupTopics: []string{"test-topic"}, Brokers: []string{"localhost:9092"}, GroupID: "test-consumer-group", SessionTimeout: time.Second * 6, CommitInterval: time.Second})
ctx := context.Background()
for {
m, err := r.FetchMessage(ctx)
if err != nil { log.Fatal(err) }
log.Printf("msg %s", string(m.Value))
if err := r.CommitMessages(ctx, m); err != nil { log.Fatal(err) }
}
// Auto‑commit using ReadMessage (commits internally)
for {
m, err := r.ReadMessage(ctx) // commits automatically
if err != nil { log.Fatal(err) }
log.Printf("msg %s", string(m.Value))
}Manual commit : omit CommitInterval . The application must explicitly call CommitMessages . The library runs a commitLoopImmediate goroutine that reads offsets from r.commits and commits them immediately, returning any error through a dedicated channel so the call can be synchronous.
// Manual‑commit example (FetchMessage + CommitMessages)
r := kafka.NewReader(kafka.ReaderConfig{GroupTopics: []string{"test-topic"}, Brokers: []string{"localhost:9092"}, GroupID: "test-consumer-group", SessionTimeout: time.Second * 6})
ctx := context.Background()
for {
m, err := r.FetchMessage(ctx)
if err != nil { log.Fatal(err) }
log.Printf("msg %s", string(m.Value))
if err := r.CommitMessages(ctx, m); err != nil { log.Fatal(err) }
}
// Manual‑commit using ReadMessage (still needs explicit commit if auto‑commit disabled)
for {
m, err := r.ReadMessage(ctx) // reads without auto‑commit
if err != nil { log.Fatal(err) }
// process message
if err := r.CommitMessages(ctx, m); err != nil { log.Fatal(err) }
}The choice between automatic and manual offset commit directly influences consumption throughput and data safety. Automatic commits reduce latency but may lose uncommitted messages on failure; manual commits give precise control at the cost of additional code and potential performance impact.
37 Interactive Technology Team
37 Interactive Technology Center
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.