Concurrent Task Processing in Go: Channels, WaitGroup, Errors & Timeouts

This article demonstrates how to launch ten concurrent Go tasks using channels and a WaitGroup, capture each task's result and error, handle timeouts and panics, and preserve the original order of results before returning a unified response.

MaGe Linux Operations
MaGe Linux Operations
MaGe Linux Operations
Concurrent Task Processing in Go: Channels, WaitGroup, Errors & Timeouts

Requirement

When an API returns a list of ten elements, we need to execute ten tasks concurrently, collect each task's result and any exception, and assemble them into a slice that is returned as a single response.

Struct for Goroutine Processing

type Order struct {
    Name string `json:"name"`
    Id   int    `json:"id"`
}

Determine Channel Count

Usually the number of channels matches the number of elements to process.

taskNum := 10

Initialize Channels

orderCh := make(chan Order, taskNum) // receives results
errCh   := make(chan error, taskNum) // receives errors

Launch Tasks with sync.WaitGroup

wg := sync.WaitGroup{}
for i := 0; i < taskNum; i++ {
    wg.Add(1)
    go func() {
        defer wg.Done()
        if i == 3 { // simulate an error at i=3
            errCh <- errors.New("there is an error")
            return
        }
        res := Order{Name: "num: " + strconv.Itoa(i), Id: i}
        orderCh <- res
    }()
}
wg.Wait() // wait for all tasks

Receive Results with for‑select

orderList := make([]Order, taskNum)
for i := 0; i < taskNum; i++ {
    select {
    case order, ok := <-orderCh:
        if ok {
            orderList = append(orderList, order)
        }
    case err := <-errCh:
        if err != nil {
            return err // stop on first error
        }
    default:
        fmt.Println("done")
    }
}
close(orderCh)
close(errCh)

Timeout Handling

Each task must finish within a defined time; a timer is used to enforce this.

timeoutTime := time.Second * 3
taskTimer := time.NewTimer(timeoutTime)
for i := 0; i < taskNum; i++ {
    select {
    // ... other cases ...
    case <-taskTimer.C:
        err := errors.New("task timeout")
        return err
    }
    taskTimer.Reset(timeoutTime)
}

Panic Recovery in Goroutine

go func() {
    defer func() {
        wg.Done()
        if r := recover(); r != nil {
            err := errors.New(fmt.Sprintf("System panic:%v", r))
            errCh <- err
            return
        }
    }()
    // task logic
}()

Preserving Order

When the output order must match the input order, a struct with a sequence field is used.

type OrderWithSeq struct {
    Seq       int
    OrderItem Order
}

type BySeq []OrderWithSeq
func (a BySeq) Len() int           { return len(a) }
func (a BySeq) Swap(i, j int)      { a[i], a[j] = a[j], a[i] }
func (a BySeq) Less(i, j int) bool { return a[i].Seq < a[j].Seq }

orderCh := make(chan OrderWithSeq, taskNum)
// send OrderWithSeq{Seq: i, OrderItem: res}
// receive into orderSeqList, then sort.Sort(BySeq(orderSeqList))

Complete Template

type Order struct {
    Name string `json:"name"`
    Id   int    `json:"id"`
}

type OrderWithSeq struct {
    Seq       int
    OrderItem Order
}

type BySeq []OrderWithSeq
func (a BySeq) Len() int           { return len(a) }
func (a BySeq) Swap(i, j int)      { a[i], a[j] = a[j], a[i] }
func (a BySeq) Less(i, j int) bool { return a[i].Seq < a[j].Seq }

taskNum := 10
orderCh := make(chan OrderWithSeq, taskNum)
errCh   := make(chan error, taskNum)
wg := sync.WaitGroup{}
for i := 0; i < taskNum; i++ {
    wg.Add(1)
    go func(i int) {
        defer func() {
            wg.Done()
            if r := recover(); r != nil {
                errCh <- errors.New(fmt.Sprintf("System panic:%v", r))
            }
        }()
        res := Order{Name: "num: " + strconv.Itoa(i), Id: i}
        orderCh <- OrderWithSeq{Seq: i, OrderItem: res}
    }(i)
}
wg.Wait()
orderSeqList := make([]OrderWithSeq, taskNum)
timeoutTime := time.Second * 3
taskTimer := time.NewTimer(timeoutTime)
for i := 0; i < taskNum; i++ {
    select {
    case order, ok := <-orderCh:
        if ok { orderSeqList = append(orderSeqList, order) }
    case err := <-errCh:
        if err != nil { return err }
    case <-taskTimer.C:
        return errors.New("task timeout")
    default:
        fmt.Println("done")
    }
    taskTimer.Reset(timeoutTime)
}
close(orderCh)
close(errCh)
sort.Sort(BySeq(orderSeqList))
// assemble final ordered result
Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

concurrencyTimeoutChannelsWaitGrouppanic recovery
MaGe Linux Operations
Written by

MaGe Linux Operations

Founded in 2009, MaGe Education is a top Chinese high‑end IT training brand. Its graduates earn 12K+ RMB salaries, and the school has trained tens of thousands of students. It offers high‑pay courses in Linux cloud operations, Python full‑stack, automation, data analysis, AI, and Go high‑concurrency architecture. Thanks to quality courses and a solid reputation, it has talent partnerships with numerous internet firms.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.