Understanding Delaying Queue and Priority Queue in WorkQueue: Implementation, Usage, and Underlying Quadruple Heap
This article explains the design, implementation details, and practical usage of the Delaying Queue and Priority Queue modules in the WorkQueue library, covering their inheritance from the core Queue, the role of the Quadruple Heap data structure, and providing Go code examples for integration in backend systems.
Author Lee, a veteran with 17 years in the IT industry, continues the WorkQueue series by introducing the advanced modules: Delaying Queue and Priority Queue, which extend the core Queue functionality with time‑based and priority‑based task execution.
Event Background : In many business scenarios, tasks need to be executed after a delay or according to priority, and embedding such logic directly in business code leads to poor reusability, redundancy, and error‑prone implementations. WorkQueue offers generic solutions via its Delaying Queue and Priority Queue modules.
Why Use Quadruple Heap : Both modules rely on a Quadruple Heap (a four‑ary heap) instead of a binary heap, providing a shallower tree and faster insert/delete operations, especially for large data sets. The heap maintains the usual min/max properties, enabling efficient retrieval of the next element to process.
Queue Overview : The core Queue provides basic enqueue/dequeue operations. WorkQueue chooses a Min Quadruple Heap as its heap implementation to balance performance and ease of use.
1. Delaying Queue
The Delaying Queue schedules elements to become eligible for execution after a specified delay. It inherits all interfaces and callbacks from Queue and adds the AddAfter method and the OnAddAfter callback.
type DelayingInterface interface {
Interface
// AddAfter adds an element that will be executed after a delay
AddAfter(element any, delay time.Duration) error
}Implementation highlights include checking for queue closure, handling non‑positive delays by delegating to Add , setting the element’s expiration timestamp, pushing it onto the waiting heap, and invoking the callback.
func (q *DelayingQ) AddAfter(element any, delay time.Duration) error {
if q.IsClosed() {
return ErrorQueueClosed
}
if delay <= 0 {
return q.Add(element)
}
ele := q.elepool.Get()
ele.SetData(element)
ele.SetValue(time.Now().Add(delay).UnixMilli()) // set expiration
q.lock.Lock()
q.waiting.Push(ele)
q.lock.Unlock()
q.config.cb.OnAddAfter(element, delay)
return nil
}Typical usage creates a queue (e.g., workqueue.NewDelayingQueue(nil) or workqueue.DefaultDelayingQueue() ), starts a goroutine to consume elements with Get , processes them, and calls Done to mark completion. Elements can be added immediately with Add or delayed with AddAfter .
package main
import (
"fmt"
"time"
"github.com/shengyanli1982/workqueue"
)
func main() {
q := workqueue.NewDelayingQueue(nil)
go func() {
for {
element, err := q.Get()
if err != nil {
fmt.Println(err)
return
}
fmt.Println("get element:", element)
q.Done(element)
}
}()
_ = q.Add("hello")
_ = q.AddAfter("delay", time.Second)
time.Sleep(2 * time.Second)
q.Stop()
}Custom queues can be bound by providing a configuration that implements the Interface and passing it to NewDelayingQueue .
2. Priority Queue
The Priority Queue orders elements based on a weight value; lower weight means higher priority. It also inherits all Queue interfaces and adds AddWeight and the OnAddWeight callback.
type PriorityInterface interface {
Interface
// AddWeight adds an element with a specific weight and sorts it within a time window
AddWeight(element any, weight int) error
}The default sorting window is 500 ms, configurable via PriorityQConfig.WithWindow . Elements with weight ≤ 0 are added directly and executed immediately.
func (q *PriorityQ) AddWeight(element any, weight int) error {
if q.IsClosed() {
return ErrorQueueClosed
}
if weight <= 0 {
return q.Add(element)
}
ele := q.elepool.Get()
ele.SetData(element)
ele.SetValue(int64(weight)) // set weight
q.lock.Lock()
q.waiting.Push(ele)
q.lock.Unlock()
q.config.cb.OnAddWeight(element, weight)
return nil
}Typical usage creates a priority queue with a custom window (e.g., workqueue.NewPriorityQConfig().WithWindow(time.Second) ), consumes elements similarly to the Delaying Queue, and adds items via Add or AddWeight .
package main
import (
"fmt"
"time"
"github.com/shengyanli1982/workqueue"
)
func main() {
conf := workqueue.NewPriorityQConfig().WithWindow(time.Second)
q := workqueue.NewPriorityQueue(conf)
go func() {
for {
element, err := q.Get()
if err != nil {
fmt.Println(err)
return
}
fmt.Println("get element:", element)
q.Done(element)
}
}()
_ = q.Add("hello")
_ = q.AddWeight("delay", 10)
time.Sleep(2 * time.Second)
q.Stop()
}Custom queues can also be bound to a Priority Queue by providing a configuration that implements the required Interface .
Conclusion
The article has introduced the definitions, usage patterns, and internal implementations of both Delaying Queue and Priority Queue in the WorkQueue project, highlighting their reliance on the Quadruple Heap and their inheritance from the core Queue. Selecting between a regular Queue and a Simple Queue depends on specific business needs, and the next article will cover the RateLimiting Queue module.
Rare Earth Juejin Tech Community
Juejin, a tech community that helps developers grow.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.