Build a High‑Performance Go WebSocket Server with fasthttp, Priority Queues, and Prometheus
Learn how to construct a low‑latency, scalable WebSocket server in Go using fasthttp, custom read/write pumps, priority message queues, a worker‑pool, and Prometheus metrics, with full source code, detailed module explanations, and deployment instructions for real‑time high‑concurrency applications.
Overview
This guide demonstrates how to build a high‑throughput, low‑latency WebSocket server in Go using fasthttp and fasthttp/websocket. The server replaces the standard net/http stack, runs independent read/write pumps per connection, employs a priority message queue, processes business logic via a worker pool, and exposes Prometheus metrics for observability.
Core Design Concepts
High‑Performance I/O
Replace net/http with fasthttp to reduce garbage‑collector pressure.
Each connection runs a dedicated read pump and write pump.
The write pump batches messages and respects a priority queue.
Priority Message Queue
Every connection maintains a min‑heap that orders messages by numeric priority.
High‑priority messages (e.g., heartbeats, system notifications) are sent first.
If the queue reaches sendQueueSize, new messages are dropped and counted.
Room Management & Broadcast
A global Hub maps room names to Room objects: map[string]*Room.
Rooms support intra‑room broadcast and global broadcast via the hub.
Worker Pool for Asynchronous Tasks
The read pump parses incoming frames and forwards business‑logic tasks to a fixed‑size worker pool.
The pool limits concurrency, provides back‑pressure, and records submitted/dropped tasks.
Implementation Details
Prometheus Metrics Registration
var (
activeConnections = prometheus.NewGauge(prometheus.GaugeOpts{Name: "ws_active_connections", Help: "Current active connections"})
messagesSent = prometheus.NewCounter(prometheus.CounterOpts{Name: "ws_messages_sent_total", Help: "Total messages sent"})
messagesDropped = prometheus.NewCounter(prometheus.CounterOpts{Name: "ws_messages_dropped_total", Help: "Messages dropped due to full priority queue"})
sendQueueLength = prometheus.NewGauge(prometheus.GaugeOpts{Name: "ws_send_queue_length", Help: "Sum of send queue lengths across all connections"})
tasksSubmitted = prometheus.NewCounter(prometheus.CounterOpts{Name: "ws_tasks_submitted_total", Help: "Total tasks submitted to worker pool"})
tasksDropped = prometheus.NewCounter(prometheus.CounterOpts{Name: "ws_tasks_dropped_total", Help: "Tasks dropped when worker queue is full"})
workerQueueLen = prometheus.NewGauge(prometheus.GaugeOpts{Name: "ws_worker_queue_length", Help: "Current length of worker pool task queue"})
)
func init() {
prometheus.MustRegister(activeConnections, messagesSent, messagesDropped, sendQueueLength, tasksSubmitted, tasksDropped, workerQueueLen)
}Buffer Pool
var bufPool = sync.Pool{New: func() interface{} { b := make([]byte, 0, initialBufCap); return &b}}Hub / Room Management
type Room struct {
name string
conns map[*ConnWrap]bool
lock sync.RWMutex
}
func NewRoom(name string) *Room { return &Room{name: name, conns: make(map[*ConnWrap]bool)} }
func (r *Room) AddConn(c *ConnWrap) { r.lock.Lock(); defer r.lock.Unlock(); r.conns[c] = true; c.room = r }
func (r *Room) RemoveConn(c *ConnWrap) { r.lock.Lock(); defer r.lock.Unlock(); delete(r.conns, c) }
func (r *Room) Broadcast(msg []byte, priority int) {
r.lock.RLock(); defer r.lock.RUnlock()
for c := range r.conns { _ = c.Send(msg, priority) }
}Worker Pool
type Task struct { Conn *ConnWrap; Data []byte }
type WorkerPool struct { tasks chan Task; wg sync.WaitGroup; quit chan struct{} }
var workerPool *WorkerPool
func NewWorkerPool(n, cap int) *WorkerPool {
wp := &WorkerPool{tasks: make(chan Task, cap), quit: make(chan struct{})}
wp.wg.Add(n)
for i := 0; i < n; i++ {
go func(id int) {
defer wp.wg.Done()
for {
select {
case t, ok := <-wp.tasks:
if !ok { return }
handleTask(id, t)
workerQueueLen.Set(float64(len(wp.tasks)))
case <-wp.quit:
return
}
}
}(i)
}
return wp
}
func (wp *WorkerPool) Submit(t Task) bool {
select { case wp.tasks <- t: workerQueueLen.Set(float64(len(wp.tasks))); return true default: return false }
}
func (wp *WorkerPool) Shutdown() { close(wp.tasks); close(wp.quit); wp.wg.Wait() }WebSocket Connection Wrapper
type ConnWrap struct {
conn *websocket.Conn
ctx context.Context
cancel context.CancelFunc
room *Room
pq PriorityQueue
pqCond *sync.Cond
closed int32
wg sync.WaitGroup
}
func NewConnWrap(ws *websocket.Conn) *ConnWrap {
ctx, cancel := context.WithCancel(context.Background())
c := &ConnWrap{conn: ws, ctx: ctx, cancel: cancel, pqCond: sync.NewCond(&sync.Mutex{})}
heap.Init(&c.pq)
activeConnections.Inc()
c.wg.Add(2)
go c.readPump()
go c.writePump()
return c
}
func (c *ConnWrap) Close() {
if !atomic.CompareAndSwapInt32(&c.closed, 0, 1) { return }
c.cancel(); _ = c.conn.Close(); c.wg.Wait(); activeConnections.Dec()
if c.room != nil { c.room.RemoveConn(c) }
}
func (c *ConnWrap) Send(data []byte, priority int) bool {
if atomic.LoadInt32(&c.closed) == 1 { return false }
p := bufPool.Get().(*[]byte)
*p = (*p)[:0]
*p = append(*p, data...)
item := &priorityMsg{data: *p, priority: priority}
c.pqCond.L.Lock()
if len(c.pq) >= sendQueueSize {
c.pqCond.L.Unlock()
bufPool.Put(p)
messagesDropped.Inc()
sendQueueLength.Set(float64(len(c.pq)))
return false
}
heap.Push(&c.pq, item)
c.pqCond.Signal()
c.pqCond.L.Unlock()
sendQueueLength.Set(float64(len(c.pq)))
return true
}
// readPump parses messages, handles "join:", "msg:", "task:" prefixes, and forwards tasks to the worker pool.
// writePump batches queued messages, sends ping frames, and writes messages to the client.WebSocket Handler
var upgrader = websocket.FastHTTPUpgrader{CheckOrigin: func(ctx *fasthttp.RequestCtx) bool { return true }}
func wsHandler(ctx *fasthttp.RequestCtx) {
err := upgrader.Upgrade(ctx, func(ws *websocket.Conn) {
c := NewConnWrap(ws)
<-c.ctx.Done()
c.Close()
})
if err != nil { log.Printf("upgrade error: %v", err) }
}Main Function & Server Startup
func main() {
workerPool = NewWorkerPool(workerPoolSize, workerQueueCapacity)
log.Printf("worker pool started: size=%d, queue=%d", workerPoolSize, workerQueueCapacity)
srv := &fasthttp.Server{Handler: func(ctx *fasthttp.RequestCtx) {
switch string(ctx.Path()) {
case "/ws":
wsHandler(ctx)
case "/metrics":
promHandler(ctx)
default:
ctx.SetStatusCode(fasthttp.StatusNotFound)
}
}}
ln, err := net.Listen("tcp4", addr)
if err != nil { log.Fatalf("listen: %v", err) }
go func() { log.Printf("listening on %s", addr); if err := srv.Serve(ln); err != nil { log.Fatalf("server error: %v", err) } }()
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
<-sigCh
log.Println("shutdown: stopping server and worker pool...")
_ = srv.Shutdown()
workerPool.Shutdown()
log.Println("shutdown complete")
}Usage Instructions
Initialize the module and run the server:
go mod init golang-ws
go mod tidy
go run main.goTest the WebSocket endpoint with JavaScript:
const socket = new WebSocket('ws://localhost:8080/ws');
socket.onopen = function() {
socket.send('join:roomA');
socket.send('task:hello');
};
socket.onmessage = function(e) { console.log('recv:', e.data); };
socket.onclose = function() { console.log('connection closed'); };Prometheus metrics are exposed at http://localhost:8080/metrics.
Optimization Suggestions
Adjust the number of workers and the task‑queue capacity to match workload characteristics.
Consider using JSON or Protobuf for the message payload format.
For cross‑instance broadcasting, integrate a message broker such as Redis or NATS.
Extend the task system with retry logic, delayed queues, or a dead‑letter queue.
Open‑Source Repository
GitHub: https://github.com/louis-xie-programmer/golang-ws Gitee (mirror): https://gitee.com/louis_xie/golang-ws
Code Wrench
Focuses on code debugging, performance optimization, and real-world engineering, sharing efficient development tips and pitfall guides. We break down technical challenges in a down-to-earth style, helping you craft handy tools so every line of code becomes a problem‑solving weapon. 🔧💻
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
