Implementation and Usage of Queue and Simple Queue in the Go WorkQueue Project
This article explains the design, implementation, and usage of the Queue and Simple Queue modules in the Go-based WorkQueue project, covering their underlying data structures, interfaces, callback mechanisms, and practical code examples for both unique and duplicate data handling.
I am Lee, a veteran with 17 years of experience in the IT industry, presenting the second article of the WorkQueue project series, focusing on the core modules Queue and Simple Queue and how to use them.
Event Background
In everyday development, Queue structures are widely used, such as storing messages in a message queue, and many Kubernetes components (e.g., Controller 's WorkQueue , Scheduler 's SchedulingQueue , Kubelet 's PodWorkers ) rely on FIFO queues. The data can be either repeatable or guaranteed unique, and WorkQueue implements both scenarios using Queue and Simple Queue .
Queue Introduction
Both Queue and Simple Queue share a FIFO logical structure, but differ in implementation details. Queue ensures data uniqueness using a set and stores order with a deque , while Simple Queue allows duplicate data and relies solely on a deque .
1. Queue
1.1 Queue Overview
Queue is a FIFO queue that implements the standard Interface with methods such as Add , Get , Len , GetWithBlock , Done , Stop , and IsClosed . It uses a set to guarantee uniqueness and a deque to preserve order.
// Queue method interface
// Queue interface
type Interface interface {
Add(element any) error // add element
Len() int // queue length
Get() (element any, err error) // get element
GetWithBlock() (element any, err error) // block until element is available
Done(element any) // mark processing complete
Stop() // stop the queue
IsClosed() bool // check if closed
}The queue maintains two internal set s: processing (elements currently being handled) and dirty (elements waiting to be processed). Both sets store unique items, while the deque holds the combined dirty + processing data.
1.2 Queue Implementation
The implementation provides a callback mechanism ( OnAdd , OnGet , OnDone ) that allows users to intervene in the element lifecycle.
// Queue callback interface
// Callback interface
type Callback interface {
OnAdd(any) // called after Add
OnGet(any) // called after Get
OnDone(any) // called after Done
}1.3 Queue Usage Example
When using Get or GetWithBlock , you must call Done after processing; otherwise, adding the same element will return ErrorQueueElementExist .
package main
import (
"fmt"
"time"
"github.com/shengyanli1982/workqueue"
)
func main() {
q := workqueue.NewQueue(nil) // create a queue
go func() {
for {
element, err := q.Get() // get element from queue
if err != nil {
fmt.Println(err)
return
}
fmt.Println("get element:", element)
q.Done(element) // mark element as done
}
}()
_ = q.Add("hello") // add element to queue
_ = q.Add("world")
time.Sleep(time.Second * 2) // wait for execution
q.Stop()
}If you prefer a shortcut, workqueue.DefaultQueue() is equivalent to workqueue.NewQueue(nil) . To use custom callbacks, create a config with conf.WithCallback(&callback{}) and pass it to NewQueue .
1.4 Queue Code Analysis
Add
// Add element to queue
func (q *Q) Add(element any) error {
if q.IsClosed() {
return ErrorQueueClosed
}
if q.isElementMarked(element) {
return ErrorQueueElementExist
}
n := q.nodepool.Get()
n.SetData(element)
q.cond.L.Lock()
q.queue.Push(n)
q.cond.Signal()
q.cond.L.Unlock()
q.prepare(element) // put into dirty set
q.config.cb.OnAdd(element)
return nil
}Get
// Get an element from the queue.
func (q *Q) Get() (element any, err error) {
if q.IsClosed() {
return nil, ErrorQueueClosed
}
q.qlock.Lock()
n := q.queue.Pop()
q.qlock.Unlock()
if n == nil {
return nil, ErrorQueueEmpty
}
element = n.Data()
q.todo(element) // move from dirty to processing
q.config.cb.OnGet(element)
q.nodepool.Put(n)
return element, nil
}GetWithBlock
// Get an element, blocking if the queue is empty.
func (q *Q) GetWithBlock() (element any, err error) {
if q.IsClosed() {
return nil, ErrorQueueClosed
}
q.cond.L.Lock()
for q.queue.Len() == 0 {
q.cond.Wait()
}
n := q.queue.Pop()
q.cond.L.Unlock()
if n == nil {
return nil, ErrorQueueEmpty
}
element = n.Data()
q.todo(element)
q.config.cb.OnGet(element)
q.nodepool.Put(n)
return element, nil
}2. Simple Queue
2.1 Simple Queue Overview
Simple Queue mirrors Queue but does not enforce uniqueness; it uses only a deque . It implements the same Interface (Add, Get, Len, GetWithBlock, Done, Stop, IsClosed) and supports identical callback methods.
// Simple Queue method interface
// Queue interface
type Interface interface {
Add(element any) error // add element
Len() int // queue length
Get() (element any, err error) // get element
GetWithBlock() (element any, err error) // block until element is available
Done(element any) // mark processing complete
Stop() // stop the queue
IsClosed() bool // check if closed
}2.2 Simple Queue Implementation
The design is similar to Queue but omits the set handling, allowing duplicate entries.
2.3 Simple Queue Usage Example
Unlike Queue , calling Done after Get is optional because duplicate data is allowed; the method is a no‑op kept for interface compatibility.
package main
import (
"fmt"
"time"
"github.com/shengyanli1982/workqueue"
)
func main() {
q := workqueue.NewSimpleQueue(nil) // create a simple queue
go func() {
for {
element, err := q.Get()
if err != nil {
fmt.Println(err)
return
}
fmt.Println("get element:", element)
q.Done(element) // optional for Simple Queue
}
}()
_ = q.Add("hello")
_ = q.Add("world")
time.Sleep(time.Second * 2)
q.Stop()
}Alternatively, workqueue.DefaultSimpleQueue() creates an equivalent instance. Custom callbacks are set the same way as with Queue using a config object.
2.4 Simple Queue Code Analysis
Add
// Add element to Simple Queue
func (q *SimpleQ) Add(element any) error {
if q.IsClosed() {
return ErrorQueueClosed
}
// No set handling here
n := q.nodepool.Get()
n.SetData(element)
q.cond.L.Lock()
q.queue.Push(n)
q.cond.Signal()
q.cond.L.Unlock()
q.config.cb.OnAdd(element)
return nil
}Get
// Get an element from Simple Queue.
func (q *SimpleQ) Get() (element any, err error) {
if q.IsClosed() {
return nil, ErrorQueueClosed
}
q.qlock.Lock()
n := q.queue.Pop()
q.qlock.Unlock()
if n == nil {
return nil, ErrorQueueEmpty
}
element = n.Data()
q.config.cb.OnGet(element)
q.nodepool.Put(n)
return element, nil
}GetWithBlock
// Block until an element is available in Simple Queue.
func (q *Q) GetWithBlock() (element any, err error) {
if q.IsClosed() {
return nil, ErrorQueueClosed
}
q.cond.L.Lock()
for q.queue.Len() == 0 {
q.cond.Wait()
}
n := q.queue.Pop()
q.cond.L.Unlock()
if n == nil {
return nil, ErrorQueueEmpty
}
element = n.Data()
q.config.cb.OnGet(element)
q.nodepool.Put(n)
return element, nil
}Conclusion
The article has introduced the definitions, usage patterns, and implementation principles of both Queue and Simple Queue . Depending on whether your data must be unique, choose Queue for uniqueness guarantees or Simple Queue when duplicates are acceptable. Upcoming articles will cover advanced modules such as Delaying Queue and Priority Queue, which build upon these core components.
Rare Earth Juejin Tech Community
Juejin, a tech community that helps developers grow.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.