Designing a Go Coroutine Pool Based on Java ThreadPoolExecutor
This article explains how to implement a high‑performance Go coroutine pool by adapting key features of Java’s ThreadPoolExecutor, covering core parameters, task queue design, dynamic scaling, worker management, timeout handling, and provides complete code examples and self‑test results.
Background and Motivation
Performance testing requires concurrent execution of many test cases to simulate real‑world load. While Java’s java.util.concurrent.ThreadPoolExecutor is a classic solution, the author wanted a similar, lightweight pool for Go, leveraging goroutine efficiency while adding scheduling and load‑protection capabilities.
Key Features of Java ThreadPoolExecutor
The original executor defines several important parameters:
Core pool size and maximum pool size – control the number of threads.
Keep‑alive time and its time unit – recycle idle threads.
Work queue – holds pending tasks.
Thread factory and rejection handler – omitted in the Go version because they are rarely needed.
Design of the Go Coroutine Pool
The Go pool mirrors the Java design with two main attributes (core and max) and introduces a waiting queue, task type discrimination, timeout handling, active‑goroutine counting, and pool state management.
Data Structures
type GorotinesPool struct {
Max int
Min int
tasks chan func() taskType
status bool
active int32
ReceiveTotal int32
ExecuteTotal int32
addTimeout time.Duration
}
type taskType int
const (
normal taskType = 0
reduce taskType = 1
)Constructor
The constructor creates the channel for tasks, starts the minimum number of workers, and launches a background balancer goroutine.
func GetPool(max, min, maxWaitTask, timeout int) *GorotinesPool {
p := &GorotinesPool{
Max: max,
Min: min,
tasks: make(chan func() taskType, maxWaitTask),
status: true,
active: 0,
ReceiveTotal: 0,
ExecuteTotal: 0,
addTimeout: time.Duration(timeout) * time.Second,
}
for i := 0; i < min; i++ {
atomic.AddInt32(&p.active, 1)
go p.worker()
}
go func() {
for {
if !p.status { break }
ftool.Sleep(1000)
p.balance()
}
}()
return p
}Dynamic Scaling (Add/Reduce Workers)
func (pool *GorotinesPool) AddWorker() {
atomic.AddInt32(&pool.active, 1)
go pool.worker()
}
func (pool *GorotinesPool) ReduceWorker() {
atomic.AddInt32(&pool.active, -1)
pool.tasks <- func() taskType { return reduce }
}
func (pool *GorotinesPool) balance() {
if pool.status {
if len(pool.tasks) > 0 && pool.active < int32(pool.Max) {
pool.AddWorker()
}
if len(pool.tasks) == 0 && pool.active > int32(pool.Min) {
pool.ReduceWorker()
}
}
}Worker Loop
func (pool *GorotinesPool) worker() {
defer func() {
if p := recover(); p != nil {
log.Printf("execute task fail: %v", p)
}
}()
Fun:
for t := range pool.tasks {
atomic.AddInt32(&pool.ExecuteTotal, 1)
switch t() {
case normal:
atomic.AddInt32(&pool.active, -1)
case reduce:
if pool.active > int32(pool.Min) {
break Fun
}
}
}
}Graceful Shutdown and Wait
func (pool *GorotinesPool) Wait() {
pool.status = false
for {
if len(pool.tasks) == 0 || pool.active == 0 {
break
}
ftool.Sleep(1000)
}
defer close(pool.tasks)
log.Printf("receive: %d, execute: %d", pool.ReceiveTotal, pool.ExecuteTotal)
}Task Execution API
func (pool *GorotinesPool) Execute(t func()) error {
if pool.status {
select {
case pool.tasks <- func() taskType { t(); return normal }:
atomic.AddInt32(&pool.ReceiveTotal, 1)
return nil
case <-time.After(pool.addTimeout):
return errors.New("add tasks timeout")
}
}
return errors.New("pools is down")
}Self‑Test
A simple test creates a pool with a large maximum size, submits three tasks, sleeps, then calls Wait. The log output shows that the pool automatically increased the number of goroutines beyond the initial minimum, confirming the dynamic scaling logic.
func TestPool(t *testing.T) {
pool := execute.GetPool(1000, 1, 200, 1)
for i := 0; i < 3; i++ {
pool.Execute(func() {
log.Println(i)
ftool.Sleep(1000)
})
}
ftool.Sleep(3000)
pool.Wait()
log.Printf("T : %d", pool.ExecuteTotal)
log.Printf("R : %d", pool.ReceiveTotal)
log.Printf("max : %d", pool.Max)
log.Printf("min : %d", pool.Min)
}The test output demonstrates that the pool grew to handle the load, then shut down cleanly.
Conclusion
The author’s Go coroutine pool reproduces the essential behavior of Java’s ThreadPoolExecutor while taking advantage of Go’s lightweight goroutine model. It supports configurable core/max sizes, dynamic scaling based on queue length, timeout‑aware task submission, and graceful shutdown, making it suitable for high‑throughput performance testing scenarios.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
