Source Code Analysis of the netpoll Go High‑Performance Network Framework
The article dissects netpoll, a Go‑based high‑performance network framework that uses a Multi‑Reactor model with a main and sub‑reactors, detailing its server EventLoop, poll manager load‑balancing across epoll/kqueue, connection buffering and request handling, and the client’s non‑blocking dialing process.
netpoll is an open‑source high‑performance network framework written in Go. It follows a Multi‑Reactor model and is mainly used for RPC scenarios. The article reviews the framework’s architecture, the internal implementation of its server and client sides, and the underlying poll manager that abstracts epoll/kqueue.
1. Reactor model overview
The classic Multi‑Reactor model consists of a single mainReactor that accepts new connections and distributes them to multiple subReactor s for further processing. The mainReactor handles only I/O events, while business logic is executed in separate worker goroutines.
2. netpoll server side
The server is started through an EventLoop implementation. The EventLoop interface is defined as:
type EventLoop interface {
// Serve registers a listener and runs blockingly to provide services, including listening to ports,
// accepting connections and processing trans data. When an exception occurs or Shutdown is invoked,
// Serve will return an error which describes the specific reason.
Serve(ln net.Listener) error
// Shutdown is used to graceful exit.
// It will close all idle connections on the server, but will not change the underlying pollers.
// Argument: ctx set the waiting deadline, after which an error will be returned,
// but will not force the closing of connections in progress.
Shutdown(ctx context.Context) error
}Creating an EventLoop and starting the server:
func NewEventLoop(onRequest OnRequest, ops ...Option) (EventLoop, error) {
opts := &options{onRequest: onRequest}
for _, do := range ops { do.f(opts) }
return &eventLoop{opts: opts, stop: make(chan error, 1)}, nil
}
func (evl *eventLoop) Serve(ln net.Listener) error {
npln, err := ConvertListener(ln)
if err != nil { return err }
evl.Lock()
evl.svr = newServer(npln, evl.opts, evl.quit)
evl.svr.Run() // start all epoll loops and block
evl.Unlock()
err = evl.waitQuit()
runtime.SetFinalizer(evl, nil)
return err
}The server creates a FDOperator for the listening socket, picks a poller from the poll manager, and registers the socket for readable events.
func (s *server) Run() (err error) {
s.operator = FDOperator{FD: s.ln.Fd(), OnRead: s.OnRead, OnHup: s.OnHup}
s.operator.poll = pollmanager.Pick() // mainReactor
err = s.operator.Control(PollReadable)
if err != nil { s.onQuit(err) }
return err
}When a new connection arrives, OnRead accepts it, creates a connection object, and registers the connection with a sub‑reactor:
func (s *server) OnRead(p Poll) error {
conn, err := s.ln.Accept()
if err != nil { return err }
connection := &connection{}
connection.init(conn.(Conn), s.opts)
// register to a sub‑reactor (pollmanager will pick one)
connection.onConnect()
return nil
}3. Poll manager
The pollmanager maintains a slice of Poll objects (epoll or kqueue) and provides load‑balancing among them.
type manager struct {
NumLoops int
balance loadbalance
polls []Poll // all poll instances
}
func (m *manager) Run() error {
for idx := len(m.polls); idx < m.NumLoops; idx++ {
poll := openPoll() // creates epoll/kqueue
m.polls = append(m.polls, poll)
go poll.Wait()
}
m.balance.Rebalance(m.polls)
return nil
}
func (m *manager) Pick() Poll { return m.balance.Pick() }The Poll interface abstracts the OS‑specific event notification mechanisms:
type Poll interface {
Wait() error
Close() error
Trigger() error
Control(operator *FDOperator, event PollEvent) error
}For Linux the default implementation is defaultPoll , which wraps epoll system calls ( epoll_create1 , epoll_ctl , epoll_wait ).
func EpollCtl(epfd int, op int, fd int, event *epollevent) (err error) {
_, _, err = syscall.RawSyscall6(syscall.SYS_EPOLL_CTL, uintptr(epfd), uintptr(op), uintptr(fd), uintptr(unsafe.Pointer(event)), 0, 0)
if err == syscall.Errno(0) { err = nil }
return err
}
func EpollWait(epfd int, events []epollevent, msec int) (n int, err error) {
var r0 uintptr
var p0 = unsafe.Pointer(&events[0])
if msec == 0 {
r0, _, err = syscall.RawSyscall6(syscall.SYS_EPOLL_WAIT, uintptr(epfd), uintptr(p0), uintptr(len(events)), 0, 0, 0)
} else {
r0, _, err = syscall.Syscall6(syscall.SYS_EPOLL_WAIT, uintptr(epfd), uintptr(p0), uintptr(len(events)), uintptr(msec), 0, 0)
}
if err == syscall.Errno(0) { err = nil }
return int(r0), err
}4. Connection handling
When a connection is initialized, buffers, barriers and callbacks are set up. The inputAck method triggers the user‑defined OnRequest callback to process business logic.
func (c *connection) inputAck(n int) (err error) {
if n < 0 { n = 0 }
length, _ := c.inputBuffer.bookAck(n)
if length == n { // first time data is ready
needTrigger = c.onRequest()
}
if needTrigger && length >= int(atomic.LoadInt32(&c.waitReadSize)) {
c.triggerRead()
}
return nil
}
func (c *connection) onRequest() (needTrigger bool) {
onRequest, ok := c.onRequestCallback.Load().(OnRequest)
if !ok { return true }
processed := c.onProcess(func(c *connection) bool { return c.Reader().Len() > 0 && c.IsActive() }, func(c *connection) { _ = onRequest(c.ctx, c) })
return !processed
}Writing data uses a buffered approach; Flush sends the buffered bytes directly via sendmsg or registers the fd for writable events if the socket would block.
func (c *connection) Flush() error {
if !c.lock(flushing) { return Exception(ErrConnClosed, "when flush") }
defer c.unlock(flushing)
c.outputBuffer.Flush()
return c.flush()
}
func (c *connection) flush() error {
if c.outputBuffer.IsEmpty() { return nil }
bs := c.outputBuffer.GetBytes(c.outputBarrier.bs)
n, err := sendmsg(c.fd, bs, c.outputBarrier.ivs, false && c.supportZeroCopy)
if err != nil && err != syscall.EAGAIN { return Exception(err, "when flush") }
if n > 0 { c.outputBuffer.Skip(n); c.outputBuffer.Release() }
if c.outputBuffer.IsEmpty() { return nil }
// register for writable events
if err = c.operator.Control(PollR2RW); err != nil { return Exception(err, "when flush") }
err = <-c.writeTrigger
return err
}5. netpoll client side
The client API provides a Dialer interface that abstracts TCP and Unix‑domain socket connections. The core of DialTCP creates a non‑blocking socket, sets default options, and performs the connect handshake.
type Dialer interface {
DialConnection(network, address string, timeout time.Duration) (connection Connection, err error)
DialTimeout(network, address string, timeout time.Duration) (conn net.Conn, err error)
}
func DialTCP(ctx context.Context, network string, laddr, raddr *TCPAddr) (*TCPConnection, error) {
if network != "tcp" && network != "tcp4" && network != "tcp6" {
return nil, &net.OpError{Op: "dial", Net: network, Err: net.UnknownNetworkError(network)}
}
if raddr == nil { return nil, &net.OpError{Op: "dial", Net: network, Err: errMissingAddress} }
sd := &sysDialer{network: network, address: raddr.String()}
c, err := sd.dialTCP(ctx, laddr, raddr)
if err != nil { return nil, &net.OpError{Op: "dial", Net: network, Err: err} }
return c, nil
}
func (sd *sysDialer) dialTCP(ctx context.Context, laddr, raddr *TCPAddr) (*TCPConnection, error) {
conn, err := internetSocket(ctx, sd.network, laddr, raddr, syscall.SOCK_STREAM, 0, "dial")
if err != nil { return nil, err }
return newTCPConnection(conn)
}The low‑level internetSocket creates a socket via sysSocket , applies default socket options, and then calls netfd.dial which ultimately invokes syscall.Connect in non‑blocking mode and waits for the connection to be established using the poller.
func sysSocket(family, sotype, proto int) (int, error) {
syscall.ForkLock.RLock()
s, err := syscall.Socket(family, sotype, proto)
if err == nil { syscall.CloseOnExec(s) }
syscall.ForkLock.RUnlock()
if err != nil { return -1, os.NewSyscallError("socket", err) }
if err = syscall.SetNonblock(s, true); err != nil { syscall.Close(s); return -1, os.NewSyscallError("setnonblock", err) }
return s, nil
}After the socket is connected, newTCPConnection wraps the file descriptor into a TCPConnection and runs the same connection.init logic as the server side, establishing buffers, callbacks, and registering the fd with a poller.
6. Conclusion
The article walks through the essential components of netpoll: the Multi‑Reactor architecture, the server’s accept‑loop and event handling, the poll manager that abstracts epoll/kqueue, the connection lifecycle (initialisation, read/write buffering, request processing), and the client‑side dialing logic. The source code demonstrates how Go’s runtime poller is leveraged to build a scalable, low‑latency network framework.
Tencent Cloud Developer
Official Tencent Cloud community account that brings together developers, shares practical tech insights, and fosters an influential tech exchange community.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.