Batching Cluster Endpoint Requests in Go: Reduce Load with Minimal Code
This article explains how to batch multiple cluster endpoint requests in Go by using a token‑based queue and concurrent workers, dramatically reducing load on the registration center while requiring minimal changes to existing code.
Background
The previous article described a system refactor that required sequentially fetching cluster address information for each address to be removed, which caused a large number of requests to the registration center when many machines were being removed.
Difficulty
The challenge is to merge these requests with as little code change as possible.
Solution
The approach borrows ideas from Go's HTTP client implementation. The Read data‑source interface remains unchanged, so upper‑level business code does not need to be modified; only the implementation of ListClusterEndpoints is replaced.
We use a queue to enqueue each request, block the caller, and let a set of goroutines fetch a batch of request parameters, issue a bulk request, and then wake the blocked callers.
We first define a token that can be waited on and completed by other goroutines:
type token struct {
value interface{}
err error
}
type Token chan token
func NewToken() Token {
return make(Token, 1)
}
func (t Token) Done(value interface{}, err error) {
t <- token{value: value, err: err}
}
func (t Token) Wait(timeout time.Duration) (value interface{}, err error) {
if timeout <= 0 {
tk := <-t
return tk.value, tk.err
}
select {
case tk := <-t:
return tk.value, tk.err
case <-time.After(timeout):
return nil, ErrTokenTimeout
}
}Next we define the data source and its parameters:
type DataSource struct {
paramCh chan param
readTimeout time.Duration
concurrency int
step int
}
type param struct {
cuuid string
token Token
}We replace the original ListClusterEndpoints implementation with a version that enqueues the request and waits for the token result:
func (p *DataSource) ListClusterEndpoints(ctx context.Context, cuuid string) ([]ptypes.Endpoint, error) {
req := param{cuuid: cuuid, token: NewToken()}
select {
case p.paramCh <- req:
default:
return nil, fmt.Errorf("list cluster endpoints write channel failed")
}
value, err := req.token.Wait(p.readTimeout)
if err != nil {
return nil, err
}
eps, ok := value.([]ptypes.Endpoint)
if !ok {
return nil, fmt.Errorf("value is not endpoints")
}
return eps, nil
}We start a pool of goroutines to process queued requests:
func (p *DataSource) startListClusterEndpointsLoop() {
for i := 0; i < p.concurrency; i++ {
go func() {
for {
reqs := p.getListClusterEndpointsReqFromChan()
p.doBatchListClusterEndpoints(reqs)
}
}()
}
}The core function getListClusterEndpointsReqFromChan retrieves a batch of parameters without busy‑waiting:
func (p *DataSource) getListClusterEndpointsReqFromChan() []param {
reqs := make([]param, 0)
select {
case req := <-p.paramCh:
reqs = append(reqs, req)
for i := 1; i < p.step; i++ {
select {
case reqNext := <-p.paramCh:
reqs = append(reqs, reqNext)
default:
break
}
}
}
return reqs
}Final Notes
While the method is simple, it requires proper monitoring of per‑request QPS and latency, batch request QPS and latency, queue write failures, and queue length to adjust the number of processing goroutines appropriately.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Xiao Lou's Tech Notes
Backend technology sharing, architecture design, performance optimization, source code reading, troubleshooting, and pitfall practices
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
