Implementing a Concurrent-Safe Queue in Go
This article explains Go's concurrency safety mechanisms, demonstrates how to build a thread‑safe queue using mutexes, sync.Cond, and context cancellation, and provides complete example code and tests to illustrate proper synchronization and resource management.
When multiple goroutine s access and modify shared data concurrently, race conditions can occur; Go addresses this with two main approaches: locking and channels, both of which serialize access to ensure safety.
The official Go documentation advises protecting shared data with channel operations or synchronization primitives from the sync and sync/atomic packages, and warns against overly clever solutions.
To illustrate these concepts, we implement a concurrent‑safe queue in Go. A basic queue requires Add and Pop operations, but the underlying slice is not safe for concurrent use, so we first protect these methods with a sync.Mutex lock.
type Queue struct {
elements []interface{}
lock *sync.Mutex
}
func (q *Queue) Pop() (ele interface{}) {
q.lock.Lock()
defer q.lock.Unlock()
ele = q.elements[q.Size()-1]
q.elements = q.elements[:q.Size()-1]
return
}
func (q *Queue) Add(ele interface{}) {
q.lock.Lock()
defer q.lock.Unlock()
q.elements = append(q.elements, ele)
}Real‑world queues need bounded capacity, requiring blocking behavior when the queue is empty (for Pop ) or full (for Add ). This is achieved with sync.Cond , analogous to Java’s built‑in condition queues, using a single mutex shared by two condition variables.
type Queue struct {
elements []interface{}
capacity int
// condition for not‑empty
notEmptyCond *sync.Cond
// condition for not‑full
notFullCond *sync.Cond
}
func (q *Queue) Size() int { return len(q.elements) }
func (q *Queue) isFull() bool { return q.Size() >= q.capacity }
func (q *Queue) isEmpty() bool { return q.Size() == 0 }
func (q *Queue) Pop() (ele interface{}) {
q.notEmptyCond.L.Lock()
defer q.notEmptyCond.L.Unlock()
for q.isEmpty() {
q.notEmptyCond.Wait()
}
ele = q.elements[q.Size()-1]
q.elements = q.elements[:q.Size()-1]
q.notFullCond.Signal()
return
}
func (q *Queue) Add(ele interface{}) (err error) {
q.notEmptyCond.L.Lock()
defer q.notEmptyCond.L.Unlock()
for q.isFull() {
q.notFullCond.Wait()
}
q.elements = append(q.elements, ele)
q.notEmptyCond.Signal()
return
}
func NewQueue(capacity int) *Queue {
var lock sync.Mutex
notEmptyCond := sync.NewCond(&lock)
notFullCond := sync.NewCond(&lock)
return &Queue{elements: make([]interface{}, 0, capacity), capacity: capacity, notEmptyCond: notEmptyCond, notFullCond: notFullCond}
}Blocking Cond.Wait() can cause goroutine leaks if a parent goroutine exits unexpectedly; to mitigate this, we introduce a cancellable wait using context.Context , allowing the wait to be aborted.
func waitWithCancel(ctx context.Context, cond *sync.Cond) error {
if ctx.Done() != nil {
done := make(chan struct{})
go func() {
cond.Wait()
close(done)
}()
select {
case <-ctx.Done():
return errors.Wrap(ctx.Err(), "cancel wait")
case <-done:
return nil
}
} else {
cond.Wait()
return nil
}
}Using this helper, the queue’s Pop and Add methods accept a context.Context parameter, enabling timeout or cancellation handling and preventing leaked goroutines.
func (q *Queue) Pop(ctx context.Context) (ele interface{}, err error) {
q.notEmptyCond.L.Lock()
defer func() {
if originalErr := errors.Cause(err); originalErr != context.DeadlineExceeded && originalErr != context.Canceled {
q.notEmptyCond.L.Unlock()
}
}()
for q.isEmpty() {
err = waitWithCancel(ctx, q.notEmptyCond)
if err != nil { return }
}
ele = q.elements[q.Size()-1]
q.elements = q.elements[:q.Size()-1]
q.notFullCond.Signal()
return
}
func (q *Queue) Add(ctx context.Context, ele interface{}) (err error) {
q.notEmptyCond.L.Lock()
defer func() {
if originalErr := errors.Cause(err); originalErr != context.DeadlineExceeded && originalErr != context.Canceled {
q.notEmptyCond.L.Unlock()
}
}()
for q.isFull() {
err = waitWithCancel(ctx, q.notFullCond)
if err != nil { return }
}
q.elements = append(q.elements, ele)
q.notEmptyCond.Signal()
return
}Test cases demonstrate that without cancellation the Pop call blocks indefinitely, while with a timeout context the operation aborts cleanly, confirming that the queue behaves correctly under concurrent access and cancellation scenarios.
New Oriental Technology
Practical internet development experience, tech sharing, knowledge consolidation, and forward-thinking insights.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.