Build a High‑Performance Go WebSocket Server with fasthttp, Priority Queues, and Prometheus

Learn how to construct a low‑latency, scalable WebSocket server in Go using fasthttp, custom read/write pumps, priority message queues, a worker‑pool, and Prometheus metrics, with full source code, detailed module explanations, and deployment instructions for real‑time high‑concurrency applications.

Code Wrench
Code Wrench
Code Wrench
Build a High‑Performance Go WebSocket Server with fasthttp, Priority Queues, and Prometheus

Overview

This guide demonstrates how to build a high‑throughput, low‑latency WebSocket server in Go using fasthttp and fasthttp/websocket. The server replaces the standard net/http stack, runs independent read/write pumps per connection, employs a priority message queue, processes business logic via a worker pool, and exposes Prometheus metrics for observability.

Core Design Concepts

High‑Performance I/O

Replace net/http with fasthttp to reduce garbage‑collector pressure.

Each connection runs a dedicated read pump and write pump.

The write pump batches messages and respects a priority queue.

Priority Message Queue

Every connection maintains a min‑heap that orders messages by numeric priority.

High‑priority messages (e.g., heartbeats, system notifications) are sent first.

If the queue reaches sendQueueSize, new messages are dropped and counted.

Room Management & Broadcast

A global Hub maps room names to Room objects: map[string]*Room.

Rooms support intra‑room broadcast and global broadcast via the hub.

Worker Pool for Asynchronous Tasks

The read pump parses incoming frames and forwards business‑logic tasks to a fixed‑size worker pool.

The pool limits concurrency, provides back‑pressure, and records submitted/dropped tasks.

Implementation Details

Prometheus Metrics Registration

var (
    activeConnections = prometheus.NewGauge(prometheus.GaugeOpts{Name: "ws_active_connections", Help: "Current active connections"})
    messagesSent = prometheus.NewCounter(prometheus.CounterOpts{Name: "ws_messages_sent_total", Help: "Total messages sent"})
    messagesDropped = prometheus.NewCounter(prometheus.CounterOpts{Name: "ws_messages_dropped_total", Help: "Messages dropped due to full priority queue"})
    sendQueueLength = prometheus.NewGauge(prometheus.GaugeOpts{Name: "ws_send_queue_length", Help: "Sum of send queue lengths across all connections"})
    tasksSubmitted = prometheus.NewCounter(prometheus.CounterOpts{Name: "ws_tasks_submitted_total", Help: "Total tasks submitted to worker pool"})
    tasksDropped = prometheus.NewCounter(prometheus.CounterOpts{Name: "ws_tasks_dropped_total", Help: "Tasks dropped when worker queue is full"})
    workerQueueLen = prometheus.NewGauge(prometheus.GaugeOpts{Name: "ws_worker_queue_length", Help: "Current length of worker pool task queue"})
)
func init() {
    prometheus.MustRegister(activeConnections, messagesSent, messagesDropped, sendQueueLength, tasksSubmitted, tasksDropped, workerQueueLen)
}

Buffer Pool

var bufPool = sync.Pool{New: func() interface{} { b := make([]byte, 0, initialBufCap); return &b}}

Hub / Room Management

type Room struct {
    name  string
    conns map[*ConnWrap]bool
    lock  sync.RWMutex
}
func NewRoom(name string) *Room { return &Room{name: name, conns: make(map[*ConnWrap]bool)} }
func (r *Room) AddConn(c *ConnWrap) { r.lock.Lock(); defer r.lock.Unlock(); r.conns[c] = true; c.room = r }
func (r *Room) RemoveConn(c *ConnWrap) { r.lock.Lock(); defer r.lock.Unlock(); delete(r.conns, c) }
func (r *Room) Broadcast(msg []byte, priority int) {
    r.lock.RLock(); defer r.lock.RUnlock()
    for c := range r.conns { _ = c.Send(msg, priority) }
}

Worker Pool

type Task struct { Conn *ConnWrap; Data []byte }
type WorkerPool struct { tasks chan Task; wg sync.WaitGroup; quit chan struct{} }
var workerPool *WorkerPool
func NewWorkerPool(n, cap int) *WorkerPool {
    wp := &WorkerPool{tasks: make(chan Task, cap), quit: make(chan struct{})}
    wp.wg.Add(n)
    for i := 0; i < n; i++ {
        go func(id int) {
            defer wp.wg.Done()
            for {
                select {
                case t, ok := <-wp.tasks:
                    if !ok { return }
                    handleTask(id, t)
                    workerQueueLen.Set(float64(len(wp.tasks)))
                case <-wp.quit:
                    return
                }
            }
        }(i)
    }
    return wp
}
func (wp *WorkerPool) Submit(t Task) bool {
    select { case wp.tasks <- t: workerQueueLen.Set(float64(len(wp.tasks))); return true default: return false }
}
func (wp *WorkerPool) Shutdown() { close(wp.tasks); close(wp.quit); wp.wg.Wait() }

WebSocket Connection Wrapper

type ConnWrap struct {
    conn   *websocket.Conn
    ctx    context.Context
    cancel context.CancelFunc
    room   *Room
    pq     PriorityQueue
    pqCond *sync.Cond
    closed int32
    wg     sync.WaitGroup
}
func NewConnWrap(ws *websocket.Conn) *ConnWrap {
    ctx, cancel := context.WithCancel(context.Background())
    c := &ConnWrap{conn: ws, ctx: ctx, cancel: cancel, pqCond: sync.NewCond(&sync.Mutex{})}
    heap.Init(&c.pq)
    activeConnections.Inc()
    c.wg.Add(2)
    go c.readPump()
    go c.writePump()
    return c
}
func (c *ConnWrap) Close() {
    if !atomic.CompareAndSwapInt32(&c.closed, 0, 1) { return }
    c.cancel(); _ = c.conn.Close(); c.wg.Wait(); activeConnections.Dec()
    if c.room != nil { c.room.RemoveConn(c) }
}
func (c *ConnWrap) Send(data []byte, priority int) bool {
    if atomic.LoadInt32(&c.closed) == 1 { return false }
    p := bufPool.Get().(*[]byte)
    *p = (*p)[:0]
    *p = append(*p, data...)
    item := &priorityMsg{data: *p, priority: priority}
    c.pqCond.L.Lock()
    if len(c.pq) >= sendQueueSize {
        c.pqCond.L.Unlock()
        bufPool.Put(p)
        messagesDropped.Inc()
        sendQueueLength.Set(float64(len(c.pq)))
        return false
    }
    heap.Push(&c.pq, item)
    c.pqCond.Signal()
    c.pqCond.L.Unlock()
    sendQueueLength.Set(float64(len(c.pq)))
    return true
}
// readPump parses messages, handles "join:", "msg:", "task:" prefixes, and forwards tasks to the worker pool.
// writePump batches queued messages, sends ping frames, and writes messages to the client.

WebSocket Handler

var upgrader = websocket.FastHTTPUpgrader{CheckOrigin: func(ctx *fasthttp.RequestCtx) bool { return true }}
func wsHandler(ctx *fasthttp.RequestCtx) {
    err := upgrader.Upgrade(ctx, func(ws *websocket.Conn) {
        c := NewConnWrap(ws)
        <-c.ctx.Done()
        c.Close()
    })
    if err != nil { log.Printf("upgrade error: %v", err) }
}

Main Function & Server Startup

func main() {
    workerPool = NewWorkerPool(workerPoolSize, workerQueueCapacity)
    log.Printf("worker pool started: size=%d, queue=%d", workerPoolSize, workerQueueCapacity)
    srv := &fasthttp.Server{Handler: func(ctx *fasthttp.RequestCtx) {
        switch string(ctx.Path()) {
        case "/ws":
            wsHandler(ctx)
        case "/metrics":
            promHandler(ctx)
        default:
            ctx.SetStatusCode(fasthttp.StatusNotFound)
        }
    }}
    ln, err := net.Listen("tcp4", addr)
    if err != nil { log.Fatalf("listen: %v", err) }
    go func() { log.Printf("listening on %s", addr); if err := srv.Serve(ln); err != nil { log.Fatalf("server error: %v", err) } }()
    sigCh := make(chan os.Signal, 1)
    signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
    <-sigCh
    log.Println("shutdown: stopping server and worker pool...")
    _ = srv.Shutdown()
    workerPool.Shutdown()
    log.Println("shutdown complete")
}

Usage Instructions

Initialize the module and run the server:

go mod init golang-ws
go mod tidy
go run main.go

Test the WebSocket endpoint with JavaScript:

const socket = new WebSocket('ws://localhost:8080/ws');
socket.onopen = function() {
    socket.send('join:roomA');
    socket.send('task:hello');
};
socket.onmessage = function(e) { console.log('recv:', e.data); };
socket.onclose = function() { console.log('connection closed'); };

Prometheus metrics are exposed at http://localhost:8080/metrics.

Optimization Suggestions

Adjust the number of workers and the task‑queue capacity to match workload characteristics.

Consider using JSON or Protobuf for the message payload format.

For cross‑instance broadcasting, integrate a message broker such as Redis or NATS.

Extend the task system with retry logic, delayed queues, or a dead‑letter queue.

Open‑Source Repository

GitHub: https://github.com/louis-xie-programmer/golang-ws Gitee (mirror): https://gitee.com/louis_xie/golang-ws
GoPrometheusWebSocketpriority-queuefasthttpWorker Pool
Code Wrench
Written by

Code Wrench

Focuses on code debugging, performance optimization, and real-world engineering, sharing efficient development tips and pitfall guides. We break down technical challenges in a down-to-earth style, helping you craft handy tools so every line of code becomes a problem‑solving weapon. 🔧💻

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.