Backend Development 7 min read

Concurrent Chunk Processing in Go: A MapReduce‑Style Solution

The article explains how to handle business scenarios that require splitting large data sets into concurrent I/O requests and sequential aggregation by presenting a Go‑based chunk processing framework with map and reduce functions, configurable concurrency, and example code.

Rare Earth Juejin Tech Community
Rare Earth Juejin Tech Community
Rare Earth Juejin Tech Community
Concurrent Chunk Processing in Go: A MapReduce‑Style Solution

In everyday backend development we often face situations that need concurrent processing, such as querying a database with a large list of IDs (splitting the list into batches of about 50) or calling external APIs that do not support bulk operations.

These tasks can be divided into two stages: the request stage, which is mainly I/O (database or external API calls), and the processing stage, which transforms, filters, and aggregates the returned data. Synchronous calls cause noticeable latency, while concurrent calls can significantly reduce it.

Analysis shows that the request stage can be parallelized because the I/O operations are independent, whereas the processing stage must be executed sequentially to guarantee correct aggregation, forming a special kind of MapReduce pattern.

The essential components for solving this pattern are: the total list length, a map function that runs concurrently, a reduce function that runs synchronously, the maximum number of concurrent workers, the chunk size, and the derived number of sub‑tasks (list length ÷ chunk size).

Below is the Go function signature that captures this idea:

func ChunkProcess(length int, procedure func(start, end int) (interface{}, error),
    reduce func(partialResult interface{}, partialErr error, start, end int), maxConcurrent int, chunkSize int)

The core logic decides whether to run the map function serially or concurrently based on the maximum concurrency and the number of chunks, using sync.Mutex to protect the reduce step, sync.WaitGroup to wait for all workers, and a buffered channel to throttle concurrency.

Implementation:

package test

import (
    "math"
    "sync"
)

func ChunkProcess(length int, procedure func(start, end int) (interface{}, error),
    reduce func(partialResult interface{}, partialErr error, start, end int), maxConcurrent int, chunkSize int) {
    if length < 1 {
        return
    }
    if maxConcurrent <= 1 || length <= chunkSize {
        doChunkProcessSerially(length, procedure, reduce, chunkSize)
    } else {
        doChunkProcessConcurrently(length, procedure, reduce, maxConcurrent, chunkSize)
    }
}

// Serial processing
func doChunkProcessSerially(length int, procedure func(start, end int) (interface{}, error),
    reduce func(partialResult interface{}, partialErr error, start, end int), chunkSize int) {
    chunkNums := int(math.Ceil(float64(length) / float64(chunkSize)))
    for i := 0; i < chunkNums; i++ {
        func(chunkIndex int) {
            defer func() {
                if err := recover(); err != nil {
                    // custom error handling
                }
            }()
            start := chunkIndex * chunkSize
            end := start + chunkSize
            if end > length {
                end = length
            }
            response, err := procedure(start, end)
            if reduce != nil {
                reduce(response, err, start, end)
            }
        }(i)
    }
}

// Concurrent processing
func doChunkProcessConcurrently(length int, procedure func(start, end int) (interface{}, error),
    reduce func(partialResult interface{}, partialErr error, start, end int), maxConcurrent int, chunkSize int) {
    index := 0
    chunkIndex := 0
    lengthTask := int(math.Ceil(float64(length) / float64(chunkSize)))
    var lock sync.Mutex
    var wg sync.WaitGroup
    wg.Add(lengthTask)
    throttleChan := make(chan struct{}, maxConcurrent)

    for {
        start := index
        end := index + chunkSize
        if end > length {
            end = length
        }
        throttleChan <- struct{}{}
        go func(chunkIndex int) {
            defer func() {
                <-throttleChan
                if err := recover(); err != nil {
                    // custom error handling
                }
                wg.Done()
            }()
            response, err := procedure(start, end)
            if reduce != nil {
                lock.Lock()
                defer lock.Unlock()
                reduce(response, err, start, end)
            }
        }(chunkIndex)
        chunkIndex++
        index = index + chunkSize
        if index >= length {
            break
        }
    }
    wg.Wait()
    close(throttleChan)
}

Test case demonstrating usage:

func TestChunkProcess(t *testing.T) {
    trackIDs := []int64{123, 456, 789}
    results := make([]int64, 0)
    ChunkProcess(len(trackIDs), func(start, end int) (interface{}, error) {
        result := trackIDs[start] + 100
        return result, nil
    }, func(partialResult interface{}, partialErr error, start, end int) {
        results = append(results, partialResult.(int64))
    }, 2, 1)
    fmt.Println(results)
}

By abstracting these business scenarios into a generic chunk‑processing framework, developers obtain a reusable solution for a wide range of concurrent I/O workloads.

backendconcurrencygolangGoMapReduceChunk Processing
Rare Earth Juejin Tech Community
Written by

Rare Earth Juejin Tech Community

Juejin, a tech community that helps developers grow.

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.