Backend Development 10 min read

Implementing a Concurrent-Safe Queue in Go

This article explains Go's concurrency safety mechanisms, demonstrates how to build a thread‑safe queue using mutexes, sync.Cond, and context cancellation, and provides complete example code and tests to illustrate proper synchronization and resource management.

New Oriental Technology
New Oriental Technology
New Oriental Technology
Implementing a Concurrent-Safe Queue in Go

When multiple goroutine s access and modify shared data concurrently, race conditions can occur; Go addresses this with two main approaches: locking and channels, both of which serialize access to ensure safety.

The official Go documentation advises protecting shared data with channel operations or synchronization primitives from the sync and sync/atomic packages, and warns against overly clever solutions.

To illustrate these concepts, we implement a concurrent‑safe queue in Go. A basic queue requires Add and Pop operations, but the underlying slice is not safe for concurrent use, so we first protect these methods with a sync.Mutex lock.

type Queue struct {
    elements []interface{}
    lock *sync.Mutex
}

func (q *Queue) Pop() (ele interface{}) {
    q.lock.Lock()
    defer q.lock.Unlock()

    ele = q.elements[q.Size()-1]
    q.elements = q.elements[:q.Size()-1]
    return
}

func (q *Queue) Add(ele interface{}) {
    q.lock.Lock()
    defer q.lock.Unlock()

    q.elements = append(q.elements, ele)
}

Real‑world queues need bounded capacity, requiring blocking behavior when the queue is empty (for Pop ) or full (for Add ). This is achieved with sync.Cond , analogous to Java’s built‑in condition queues, using a single mutex shared by two condition variables.

type Queue struct {
    elements []interface{}
    capacity int

    // condition for not‑empty
    notEmptyCond *sync.Cond
    // condition for not‑full
    notFullCond *sync.Cond
}

func (q *Queue) Size() int { return len(q.elements) }
func (q *Queue) isFull() bool { return q.Size() >= q.capacity }
func (q *Queue) isEmpty() bool { return q.Size() == 0 }

func (q *Queue) Pop() (ele interface{}) {
    q.notEmptyCond.L.Lock()
    defer q.notEmptyCond.L.Unlock()

    for q.isEmpty() {
        q.notEmptyCond.Wait()
    }
    ele = q.elements[q.Size()-1]
    q.elements = q.elements[:q.Size()-1]
    q.notFullCond.Signal()
    return
}

func (q *Queue) Add(ele interface{}) (err error) {
    q.notEmptyCond.L.Lock()
    defer q.notEmptyCond.L.Unlock()

    for q.isFull() {
        q.notFullCond.Wait()
    }
    q.elements = append(q.elements, ele)
    q.notEmptyCond.Signal()
    return
}

func NewQueue(capacity int) *Queue {
    var lock sync.Mutex
    notEmptyCond := sync.NewCond(&lock)
    notFullCond := sync.NewCond(&lock)
    return &Queue{elements: make([]interface{}, 0, capacity), capacity: capacity, notEmptyCond: notEmptyCond, notFullCond: notFullCond}
}

Blocking Cond.Wait() can cause goroutine leaks if a parent goroutine exits unexpectedly; to mitigate this, we introduce a cancellable wait using context.Context , allowing the wait to be aborted.

func waitWithCancel(ctx context.Context, cond *sync.Cond) error {
    if ctx.Done() != nil {
        done := make(chan struct{})
        go func() {
            cond.Wait()
            close(done)
        }()
        select {
        case <-ctx.Done():
            return errors.Wrap(ctx.Err(), "cancel wait")
        case <-done:
            return nil
        }
    } else {
        cond.Wait()
        return nil
    }
}

Using this helper, the queue’s Pop and Add methods accept a context.Context parameter, enabling timeout or cancellation handling and preventing leaked goroutines.

func (q *Queue) Pop(ctx context.Context) (ele interface{}, err error) {
    q.notEmptyCond.L.Lock()
    defer func() {
        if originalErr := errors.Cause(err); originalErr != context.DeadlineExceeded && originalErr != context.Canceled {
            q.notEmptyCond.L.Unlock()
        }
    }()
    for q.isEmpty() {
        err = waitWithCancel(ctx, q.notEmptyCond)
        if err != nil { return }
    }
    ele = q.elements[q.Size()-1]
    q.elements = q.elements[:q.Size()-1]
    q.notFullCond.Signal()
    return
}

func (q *Queue) Add(ctx context.Context, ele interface{}) (err error) {
    q.notEmptyCond.L.Lock()
    defer func() {
        if originalErr := errors.Cause(err); originalErr != context.DeadlineExceeded && originalErr != context.Canceled {
            q.notEmptyCond.L.Unlock()
        }
    }()
    for q.isFull() {
        err = waitWithCancel(ctx, q.notFullCond)
        if err != nil { return }
    }
    q.elements = append(q.elements, ele)
    q.notEmptyCond.Signal()
    return
}

Test cases demonstrate that without cancellation the Pop call blocks indefinitely, while with a timeout context the operation aborts cleanly, confirming that the queue behaves correctly under concurrent access and cancellation scenarios.

GogoroutineContextsyncQueue
New Oriental Technology
Written by

New Oriental Technology

Practical internet development experience, tech sharing, knowledge consolidation, and forward-thinking insights.

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.