How to Fix Critical Bugs in a Go Goroutine Pool and Boost Scaling Efficiency
This article analyzes several bugs discovered in a Go goroutine pool implementation—incorrect active count, mismatched execution counters, slow scaling during QPS spikes, and inefficient recycling—and presents detailed code fixes and redesigns to achieve accurate metrics and faster, more graceful worker management.
Background and Problem Statement
The author previously wrote a Go goroutine pool with dynamic QPS handling, but testing revealed multiple defects and bugs that required urgent correction.
Identified Defects
Incorrect calculation of active goroutine count.
Wrong execution and received task counters.
Inability to promptly increase or decrease pressure during rapid QPS changes.
Inefficient and non‑elegant goroutine recycling.
Bug Analysis
Active Goroutine Count
The error stems from repeatedly calling ReduceWorker() and AddWorker() at different moments, causing the Active counter to be updated inconsistently.
Execution and Received Counters
The ExecuteQps method merges tasks into a single goroutine without counting the original tasks separately, and the global ReceiveTotal also suffers from the same counting issue.
func (pool *GorotinesPool) ExecuteQps(t func(), qps int) {
mutiple := qps / pool.SingleTimes
remainder := qps % pool.SingleTimes
for i := 0; i < mutiple; i++ {
pool.Execute(func() {
for i := 0; i < pool.SingleTimes; i++ {
t()
}
})
}
pool.Execute(func() {
for i := 0; i < remainder; i++ {
t()
}
})
}Slow Goroutine Creation During QPS Spikes
When QPS rises or falls sharply, the pool scans a channel once per second, adding or removing only one worker at a time, which is far too slow.
Non‑Elegant Recycling
The original implementation used a scanning channel to decide whether to add or remove workers, leading to low efficiency, delayed recycling, and even over‑recycling where active goroutines receive termination signals.
func (pool *GorotinesPool) balance() {
if pool.status {
if len(pool.tasks) > 0 && pool.active < int32(pool.Max) {
pool.AddWorker()
}
if len(pool.tasks) == 0 && pool.active > int32(pool.Min) {
pool.ReduceWorker()
}
}
}Solution Overview
Rewritten Recycling Method
Inspired by Java thread‑pool implementations, a maximum idle time is introduced. Workers exit only after being idle for this period, preventing premature or excessive termination while still reclaiming resources efficiently.
func (pool *GorotinesPool) worker() {
defer func() {
if p := recover(); p != nil {
log.Printf("execute task fail: %v", p)
}
}()
Fun:
for {
select {
case t := <-pool.tasks:
atomic.AddInt32(&pool.ExecuteTotal, 1)
t()
case <-time.After(pool.MaxIdle):
if pool.Active > int32(pool.Min) {
atomic.AddInt32(&pool.Active, -1)
break Fun
}
}
}
}Improved Goroutine Scaling
After each scan, increase the number of workers by the amount needed, using a buffered channel to add the same number of workers at once.
Refactor the task‑merging algorithm in Execute to use a fixed‑size batch, reducing overhead.
func (pool *GorotinesPool) balance() {
if pool.status {
if len(pool.tasks) > 0 && pool.Active < int32(pool.Max) {
for i := 0; i < len(pool.tasks); i++ {
if int(pool.Active) < pool.Max {
pool.AddWorker()
}
}
}
}
} func (pool *GorotinesPool) ExecuteQps(t func(), qps int) {
mutiple := qps / pool.SingleTimes
remainder := qps % pool.SingleTimes
for i := 0; i < mutiple; i++ {
pool.Execute(func() {
atomic.AddInt32(&pool.ExecuteTotal, -1)
for i := 0; i < pool.SingleTimes; i++ {
atomic.AddInt32(&pool.ExecuteTotal, 1)
t()
}
})
}
pool.Execute(func() {
atomic.AddInt32(&pool.ExecuteTotal, -1)
for i := 0; i < remainder; i++ {
atomic.AddInt32(&pool.ExecuteTotal, 1)
t()
}
})
}Complete Revised Code
package execute
import (
"errors"
"funtester/ftool"
"log"
"sync/atomic"
"time"
)
type GorotinesPool struct {
Max int
Min int
tasks chan func()
status bool
Active int32
ExecuteTotal int32
SingleTimes int
addTimeout time.Duration
MaxIdle time.Duration
}
type taskType int
const (
normal taskType = 0
reduce taskType = 1
)
func GetPool(max, min, maxWaitTask, timeout, maxIdle int) *GorotinesPool {
p := &GorotinesPool{
Max: max,
Min: min,
tasks: make(chan func(), maxWaitTask),
status: true,
Active: 0,
ExecuteTotal: 0,
SingleTimes: 10,
addTimeout: time.Duration(timeout) * time.Second,
MaxIdle: time.Duration(maxIdle) * time.Second,
}
for i := 0; i < min; i++ {
p.AddWorker()
}
go func() {
for {
if !p.status { break }
ftool.Sleep(1000)
p.balance()
}
}()
return p
}
func (pool *GorotinesPool) worker() {
defer func() {
if p := recover(); p != nil {
log.Printf("execute task fail: %v", p)
}
}()
Fun:
for {
select {
case t := <-pool.tasks:
atomic.AddInt32(&pool.ExecuteTotal, 1)
t()
case <-time.After(pool.MaxIdle):
if pool.Active > int32(pool.Min) {
atomic.AddInt32(&pool.Active, -1)
break Fun
}
}
}
}
func (pool *GorotinesPool) Execute(t func()) error {
if pool.status {
select {
case pool.tasks <- func() { t() }:
return nil
case <-time.After(pool.addTimeout):
return errors.New("add tasks timeout")
}
}
return errors.New("pools is down")
}
func (pool *GorotinesPool) Wait() {
pool.status = false
Fun:
for {
if len(pool.tasks) == 0 || pool.Active == 0 { break Fun }
ftool.Sleep(1000)
}
defer close(pool.tasks)
log.Printf("execute: %d", pool.ExecuteTotal)
}
func (pool *GorotinesPool) AddWorker() {
atomic.AddInt32(&pool.Active, 1)
go pool.worker()
}
func (pool *GorotinesPool) balance() {
if pool.status {
if len(pool.tasks) > 0 && pool.Active < int32(pool.Max) {
for i := 0; i < len(pool.tasks); i++ {
if int(pool.Active) < pool.Max {
pool.AddWorker()
}
}
}
}
}
func (pool *GorotinesPool) ExecuteQps(t func(), qps int) {
mutiple := qps / pool.SingleTimes
remainder := qps % pool.SingleTimes
for i := 0; i < mutiple; i++ {
pool.Execute(func() {
atomic.AddInt32(&pool.ExecuteTotal, -1)
for i := 0; i < pool.SingleTimes; i++ {
atomic.AddInt32(&pool.ExecuteTotal, 1)
t()
}
})
}
pool.Execute(func() {
atomic.AddInt32(&pool.ExecuteTotal, -1)
for i := 0; i < remainder; i++ {
atomic.AddInt32(&pool.ExecuteTotal, 1)
t()
}
})
}Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
