Backend Development 16 min read

Implementation and Usage of Queue and Simple Queue in the Go WorkQueue Project

This article explains the design, implementation, and usage of the Queue and Simple Queue modules in the Go-based WorkQueue project, covering their underlying data structures, interfaces, callback mechanisms, and practical code examples for both unique and duplicate data handling.

Rare Earth Juejin Tech Community
Rare Earth Juejin Tech Community
Rare Earth Juejin Tech Community
Implementation and Usage of Queue and Simple Queue in the Go WorkQueue Project

I am Lee, a veteran with 17 years of experience in the IT industry, presenting the second article of the WorkQueue project series, focusing on the core modules Queue and Simple Queue and how to use them.

Event Background

In everyday development, Queue structures are widely used, such as storing messages in a message queue, and many Kubernetes components (e.g., Controller 's WorkQueue , Scheduler 's SchedulingQueue , Kubelet 's PodWorkers ) rely on FIFO queues. The data can be either repeatable or guaranteed unique, and WorkQueue implements both scenarios using Queue and Simple Queue .

Queue Introduction

Both Queue and Simple Queue share a FIFO logical structure, but differ in implementation details. Queue ensures data uniqueness using a set and stores order with a deque , while Simple Queue allows duplicate data and relies solely on a deque .

1. Queue

1.1 Queue Overview

Queue is a FIFO queue that implements the standard Interface with methods such as Add , Get , Len , GetWithBlock , Done , Stop , and IsClosed . It uses a set to guarantee uniqueness and a deque to preserve order.

// Queue method interface
// Queue interface
type Interface interface {
	Add(element any) error // add element
	Len() int // queue length
	Get() (element any, err error) // get element
	GetWithBlock() (element any, err error) // block until element is available
	Done(element any) // mark processing complete
	Stop() // stop the queue
	IsClosed() bool // check if closed
}

The queue maintains two internal set s: processing (elements currently being handled) and dirty (elements waiting to be processed). Both sets store unique items, while the deque holds the combined dirty + processing data.

1.2 Queue Implementation

The implementation provides a callback mechanism ( OnAdd , OnGet , OnDone ) that allows users to intervene in the element lifecycle.

// Queue callback interface
// Callback interface
type Callback interface {
	OnAdd(any) // called after Add
	OnGet(any) // called after Get
	OnDone(any) // called after Done
}

1.3 Queue Usage Example

When using Get or GetWithBlock , you must call Done after processing; otherwise, adding the same element will return ErrorQueueElementExist .

package main

import (
	"fmt"
	"time"

	"github.com/shengyanli1982/workqueue"
)

func main() {
	q := workqueue.NewQueue(nil) // create a queue

	go func() {
		for {
			element, err := q.Get() // get element from queue
			if err != nil {
				fmt.Println(err)
				return
			}
			fmt.Println("get element:", element)
			q.Done(element) // mark element as done
		}
	}()

	_ = q.Add("hello") // add element to queue
	_ = q.Add("world")

	time.Sleep(time.Second * 2) // wait for execution

	q.Stop()
}

If you prefer a shortcut, workqueue.DefaultQueue() is equivalent to workqueue.NewQueue(nil) . To use custom callbacks, create a config with conf.WithCallback(&callback{}) and pass it to NewQueue .

1.4 Queue Code Analysis

Add

// Add element to queue
func (q *Q) Add(element any) error {
	if q.IsClosed() {
		return ErrorQueueClosed
	}

	if q.isElementMarked(element) {
		return ErrorQueueElementExist
	}

	n := q.nodepool.Get()
	n.SetData(element)

	q.cond.L.Lock()
	q.queue.Push(n)
	q.cond.Signal()
	q.cond.L.Unlock()

	q.prepare(element) // put into dirty set
	q.config.cb.OnAdd(element)

	return nil
}

Get

// Get an element from the queue.
func (q *Q) Get() (element any, err error) {
	if q.IsClosed() {
		return nil, ErrorQueueClosed
	}

	q.qlock.Lock()
	n := q.queue.Pop()
	q.qlock.Unlock()
	if n == nil {
		return nil, ErrorQueueEmpty
	}

	element = n.Data()
	q.todo(element) // move from dirty to processing
	q.config.cb.OnGet(element)
	q.nodepool.Put(n)

	return element, nil
}

GetWithBlock

// Get an element, blocking if the queue is empty.
func (q *Q) GetWithBlock() (element any, err error) {
	if q.IsClosed() {
		return nil, ErrorQueueClosed
	}

	q.cond.L.Lock()
	for q.queue.Len() == 0 {
		q.cond.Wait()
	}
	n := q.queue.Pop()
	q.cond.L.Unlock()
	if n == nil {
		return nil, ErrorQueueEmpty
	}

	element = n.Data()
	q.todo(element)
	q.config.cb.OnGet(element)
	q.nodepool.Put(n)

	return element, nil
}

2. Simple Queue

2.1 Simple Queue Overview

Simple Queue mirrors Queue but does not enforce uniqueness; it uses only a deque . It implements the same Interface (Add, Get, Len, GetWithBlock, Done, Stop, IsClosed) and supports identical callback methods.

// Simple Queue method interface
// Queue interface
type Interface interface {
	Add(element any) error // add element
	Len() int // queue length
	Get() (element any, err error) // get element
	GetWithBlock() (element any, err error) // block until element is available
	Done(element any) // mark processing complete
	Stop() // stop the queue
	IsClosed() bool // check if closed
}

2.2 Simple Queue Implementation

The design is similar to Queue but omits the set handling, allowing duplicate entries.

2.3 Simple Queue Usage Example

Unlike Queue , calling Done after Get is optional because duplicate data is allowed; the method is a no‑op kept for interface compatibility.

package main

import (
	"fmt"
	"time"

	"github.com/shengyanli1982/workqueue"
)

func main() {
	q := workqueue.NewSimpleQueue(nil) // create a simple queue

	go func() {
		for {
			element, err := q.Get()
			if err != nil {
				fmt.Println(err)
				return
			}
			fmt.Println("get element:", element)
			q.Done(element) // optional for Simple Queue
		}
	}()

	_ = q.Add("hello")
	_ = q.Add("world")

	time.Sleep(time.Second * 2)

	q.Stop()
}

Alternatively, workqueue.DefaultSimpleQueue() creates an equivalent instance. Custom callbacks are set the same way as with Queue using a config object.

2.4 Simple Queue Code Analysis

Add

// Add element to Simple Queue
func (q *SimpleQ) Add(element any) error {
	if q.IsClosed() {
		return ErrorQueueClosed
	}

	// No set handling here

	n := q.nodepool.Get()
	n.SetData(element)

	q.cond.L.Lock()
	q.queue.Push(n)
	q.cond.Signal()
	q.cond.L.Unlock()

	q.config.cb.OnAdd(element)

	return nil
}

Get

// Get an element from Simple Queue.
func (q *SimpleQ) Get() (element any, err error) {
	if q.IsClosed() {
		return nil, ErrorQueueClosed
	}

	q.qlock.Lock()
	n := q.queue.Pop()
	q.qlock.Unlock()
	if n == nil {
		return nil, ErrorQueueEmpty
	}

	element = n.Data()
	q.config.cb.OnGet(element)
	q.nodepool.Put(n)

	return element, nil
}

GetWithBlock

// Block until an element is available in Simple Queue.
func (q *Q) GetWithBlock() (element any, err error) {
	if q.IsClosed() {
		return nil, ErrorQueueClosed
	}

	q.cond.L.Lock()
	for q.queue.Len() == 0 {
		q.cond.Wait()
	}
	n := q.queue.Pop()
	q.cond.L.Unlock()
	if n == nil {
		return nil, ErrorQueueEmpty
	}

	element = n.Data()
	q.config.cb.OnGet(element)
	q.nodepool.Put(n)

	return element, nil
}

Conclusion

The article has introduced the definitions, usage patterns, and implementation principles of both Queue and Simple Queue . Depending on whether your data must be unique, choose Queue for uniqueness guarantees or Simple Queue when duplicates are acceptable. Upcoming articles will cover advanced modules such as Delaying Queue and Priority Queue, which build upon these core components.

backendgolangKubernetesData StructuresQueueWorkQueuesimple queue
Rare Earth Juejin Tech Community
Written by

Rare Earth Juejin Tech Community

Juejin, a tech community that helps developers grow.

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.