Designing a Go Coroutine Pool Based on Java ThreadPoolExecutor

This article explains how to implement a high‑performance Go coroutine pool by adapting key features of Java’s ThreadPoolExecutor, covering core parameters, task queue design, dynamic scaling, worker management, timeout handling, and provides complete code examples and self‑test results.

FunTester
FunTester
FunTester
Designing a Go Coroutine Pool Based on Java ThreadPoolExecutor

Background and Motivation

Performance testing requires concurrent execution of many test cases to simulate real‑world load. While Java’s java.util.concurrent.ThreadPoolExecutor is a classic solution, the author wanted a similar, lightweight pool for Go, leveraging goroutine efficiency while adding scheduling and load‑protection capabilities.

Key Features of Java ThreadPoolExecutor

The original executor defines several important parameters:

Core pool size and maximum pool size – control the number of threads.

Keep‑alive time and its time unit – recycle idle threads.

Work queue – holds pending tasks.

Thread factory and rejection handler – omitted in the Go version because they are rarely needed.

Design of the Go Coroutine Pool

The Go pool mirrors the Java design with two main attributes (core and max) and introduces a waiting queue, task type discrimination, timeout handling, active‑goroutine counting, and pool state management.

Data Structures

type GorotinesPool struct {
    Max          int
    Min          int
    tasks        chan func() taskType
    status       bool
    active       int32
    ReceiveTotal int32
    ExecuteTotal int32
    addTimeout   time.Duration
}

type taskType int

const (
    normal taskType = 0
    reduce taskType = 1
)

Constructor

The constructor creates the channel for tasks, starts the minimum number of workers, and launches a background balancer goroutine.

func GetPool(max, min, maxWaitTask, timeout int) *GorotinesPool {
    p := &GorotinesPool{
        Max:          max,
        Min:          min,
        tasks:        make(chan func() taskType, maxWaitTask),
        status:       true,
        active:       0,
        ReceiveTotal: 0,
        ExecuteTotal: 0,
        addTimeout:   time.Duration(timeout) * time.Second,
    }
    for i := 0; i < min; i++ {
        atomic.AddInt32(&p.active, 1)
        go p.worker()
    }
    go func() {
        for {
            if !p.status { break }
            ftool.Sleep(1000)
            p.balance()
        }
    }()
    return p
}

Dynamic Scaling (Add/Reduce Workers)

func (pool *GorotinesPool) AddWorker() {
    atomic.AddInt32(&pool.active, 1)
    go pool.worker()
}

func (pool *GorotinesPool) ReduceWorker() {
    atomic.AddInt32(&pool.active, -1)
    pool.tasks <- func() taskType { return reduce }
}

func (pool *GorotinesPool) balance() {
    if pool.status {
        if len(pool.tasks) > 0 && pool.active < int32(pool.Max) {
            pool.AddWorker()
        }
        if len(pool.tasks) == 0 && pool.active > int32(pool.Min) {
            pool.ReduceWorker()
        }
    }
}

Worker Loop

func (pool *GorotinesPool) worker() {
    defer func() {
        if p := recover(); p != nil {
            log.Printf("execute task fail: %v", p)
        }
    }()
Fun:
    for t := range pool.tasks {
        atomic.AddInt32(&pool.ExecuteTotal, 1)
        switch t() {
        case normal:
            atomic.AddInt32(&pool.active, -1)
        case reduce:
            if pool.active > int32(pool.Min) {
                break Fun
            }
        }
    }
}

Graceful Shutdown and Wait

func (pool *GorotinesPool) Wait() {
    pool.status = false
    for {
        if len(pool.tasks) == 0 || pool.active == 0 {
            break
        }
        ftool.Sleep(1000)
    }
    defer close(pool.tasks)
    log.Printf("receive: %d, execute: %d", pool.ReceiveTotal, pool.ExecuteTotal)
}

Task Execution API

func (pool *GorotinesPool) Execute(t func()) error {
    if pool.status {
        select {
        case pool.tasks <- func() taskType { t(); return normal }:
            atomic.AddInt32(&pool.ReceiveTotal, 1)
            return nil
        case <-time.After(pool.addTimeout):
            return errors.New("add tasks timeout")
        }
    }
    return errors.New("pools is down")
}

Self‑Test

A simple test creates a pool with a large maximum size, submits three tasks, sleeps, then calls Wait. The log output shows that the pool automatically increased the number of goroutines beyond the initial minimum, confirming the dynamic scaling logic.

func TestPool(t *testing.T) {
    pool := execute.GetPool(1000, 1, 200, 1)
    for i := 0; i < 3; i++ {
        pool.Execute(func() {
            log.Println(i)
            ftool.Sleep(1000)
        })
    }
    ftool.Sleep(3000)
    pool.Wait()
    log.Printf("T : %d", pool.ExecuteTotal)
    log.Printf("R : %d", pool.ReceiveTotal)
    log.Printf("max : %d", pool.Max)
    log.Printf("min : %d", pool.Min)
}

The test output demonstrates that the pool grew to handle the load, then shut down cleanly.

Conclusion

The author’s Go coroutine pool reproduces the essential behavior of Java’s ThreadPoolExecutor while taking advantage of Go’s lightweight goroutine model. It supports configurable core/max sizes, dynamic scaling based on queue length, timeout‑aware task submission, and graceful shutdown, making it suitable for high‑throughput performance testing scenarios.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

BackendGoThreadPoolcoroutine
FunTester
Written by

FunTester

10k followers, 1k articles | completely useless

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.