Understanding and Implementing a RateLimiting Queue in Go's WorkQueue Project
This article introduces the RateLimiting Queue module of the Go WorkQueue project, explains why it builds on the Delaying Queue, details its design, interfaces, and usage examples, and provides code snippets for creating and operating a rate‑limited queue in backend systems.
The author, a veteran of 17 years in the IT industry, presents the final module of the WorkQueue project – the RateLimiting Queue – following previous articles on Delaying Queue and Priority Queue.
RateLimiting Queue is useful in scenarios such as Kubernetes controllers handling large data streams or high‑traffic API services where processing speed must be throttled to maintain stability.
Its overall design mirrors the client-go implementation, extending the Delaying Queue which itself is built on a basic Queue, and combines a Queue, a Min‑Quad Heap, and a Limiter to enforce rate limits.
The module’s source code is available at GitHub , and readers are encouraged to review earlier articles for context.
Why depend on Delaying Queue? The Delaying Queue provides ordered, time‑based processing, which is essential for implementing rate limiting by scheduling elements according to calculated delays.
The RateLimiting Queue inherits all features of both Queue and Delaying Queue while adding rate‑limiting capabilities. It offers a simple interface for adding elements with limits, forgetting elements, and querying how many times an element has been limited.
Key interfaces:
type RateLimitingInterface interface {
DelayingInterface
AddLimited(element any) error
Forget(element any)
NumLimitTimes(element any) int
}Callback interface:
type RateLimitingCallback interface {
DelayingCallback
OnAddLimited(any)
OnForget(any)
OnGetTimes(any, int)
}Example usage demonstrates creating a queue with a token‑bucket limiter, adding elements (immediate, delayed, and rate‑limited), processing them in a goroutine, and marking completion with Done :
package main
import (
"fmt"
"time"
"github.com/shengyanli1982/workqueue"
)
func main() {
conf := NewRateLimitingQConfig().WithLimiter(NewBucketRateLimiter(float64(1), 1)).WithCallback(&rateLimitingcallback{})
q := NewRateLimitingQueue(conf) // create a queue
defer q.Stop()
go func() {
for {
element, err := q.Get() // get element from queue
if err != nil {
fmt.Println(err)
return
}
fmt.Println(">> get element:", element)
q.Done(element) // mark element as done, 'Done' is required after 'Get'
}
}()
_ = q.Add("hello") // add element to queue, immediately execute
_ = q.Add("world")
_ = q.AddAfter("delay", time.Second*1) // add element to queue, execute after 1 second
_ = q.AddLimited("burst") // add element with limit to queue, execute with rate limit(1/s, burst 1)
_ = q.AddLimited("limit")
time.Sleep(time.Second * 2) // wait for element to be executed
}The output shows elements being processed in order respecting rate limits. The article also notes that DefaultRateLimitingQueue() is equivalent to creating a queue with a nil configuration.
Additional notes emphasize that a custom Delaying Queue can be bound to the RateLimiting Queue as long as it implements the required interfaces, and that the Forget and NumLimitTimes methods provide finer control over element lifecycle.
In summary, the RateLimiting Queue is the most advanced queue in the WorkQueue project, combining basic queueing, delayed scheduling, and token‑bucket rate limiting to offer a powerful tool for backend developers handling high‑throughput, time‑sensitive workloads.
Rare Earth Juejin Tech Community
Juejin, a tech community that helps developers grow.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.