How OpenYurt Extends Native Kubernetes to the Edge Without Code Changes
OpenYurt adds edge‑computing capabilities to a vanilla Kubernetes cluster by providing a non‑intrusive, one‑click conversion that enables autonomous edge nodes, with YurtHub and dual‑read‑closer mechanisms handling cached, filtered, and streamed data efficiently.
OpenYurt Edge Autonomy
OpenYurt extends a native Kubernetes cluster to edge nodes without modifying the core control plane. Edge nodes often operate over unreliable public networks, so they must continue to run workloads when the cloud‑edge link fails. Autonomy is provided by the yurt‑controller‑manager and the YurtHub component.
YurtHub Architecture
YurtHub acts as a transparent reverse‑proxy with a local cache. When the cloud connection is lost or a node restarts, components such as kubelet and kube-proxy obtain required data from YurtHub, preserving edge independence.
Problem: Caching Streaming HTTP Responses
A naïve implementation reads the entire http.Response.Body into memory, writes it to a cache, and then forwards the bytes to the client:
func HandleResponse(rw http.ResponseWriter, resp *http.Response) {
bodyBytes, _ := ioutil.ReadAll(resp.Body)
go func() { cacher.Write(bodyBytes) }()
rw.Write(bodyBytes)
}This approach fails for two reasons:
Streaming data (e.g., Kubernetes watch requests) cannot be fully captured with a single ReadAll call.
If the data must be filtered before caching, the extra copy doubles memory usage and complicates streaming handling.
Solution 1 – Simultaneous Read/Write with io.TeeReader
io.TeeReaderduplicates a read stream: data read by the client is simultaneously written to a cache writer.
func HandleResponse(rw http.ResponseWriter, resp *http.Response) {
// Duplicate resp.Body to the cache writer
tee := io.TeeReader(resp.Body, cacher)
// Stream data to the client while caching
io.Copy(rw, tee)
}Solution 2 – Filtering Before Caching with io.Pipe
When the response must be filtered before being cached, a pipe can split the stream into two readers: one for the filter and one for the client.
func HandleResponse(rw http.ResponseWriter, resp *http.Response) {
pr, pw := io.Pipe()
// Duplicate resp.Body to the pipe writer while still sending to client
tee := io.TeeReader(resp.Body, pw)
// Run the filter in a separate goroutine
go func() {
io.Copy(dataFilter, pr) // dataFilter processes the stream
}()
// Stream to the client
io.Copy(rw, tee)
}Combining io.TeeReader and io.Pipe enables a single pass over the response body while supporting both caching and optional filtering.
Implementation Detail – dualReadCloser
YurtHub uses a custom dualReadCloser that implements io.ReadCloser. Every Read writes the bytes to an internal pipe, allowing downstream components to consume the same data.
// https://github.com/openyurtio/openyurt/blob/master/pkg/yurthub/util/util.go#L156
func NewDualReadCloser(rc io.ReadCloser, isRespBody bool) (io.ReadCloser, io.ReadCloser) {
pr, pw := io.Pipe()
dr := &dualReadCloser{rc: rc, pw: pw, isRespBody: isRespBody}
return dr, pr
}
type dualReadCloser struct {
rc io.ReadCloser
pw *io.PipeWriter
isRespBody bool
}
func (dr *dualReadCloser) Read(p []byte) (n int, err error) {
n, err = dr.rc.Read(p)
if n > 0 {
if _, werr := dr.pw.Write(p[:n]); werr != nil {
klog.Errorf("dualReader: failed to write %v", werr)
return n, werr
}
}
return
}
func (dr *dualReadCloser) Close() error {
var errs []error
if dr.isRespBody {
if err := dr.rc.Close(); err != nil { errs = append(errs, err) }
}
if err := dr.pw.Close(); err != nil { errs = append(errs, err) }
if len(errs) != 0 { return fmt.Errorf("failed to close dualReader, %v", errs) }
return nil
}Integration in the Reverse Proxy
YurtHub’s reverse‑proxy creates a dualReadCloser in the modifyResponse hook. The response body is replaced with the dual reader, and a goroutine caches the data read from the pipe.
// https://github.com/openyurtio/openyurt/blob/master/pkg/yurthub/proxy/remote/remote.go#L85
func (rp *RemoteProxy) modifyResponse(resp *http.Response) error {
rc, prc := util.NewDualReadCloser(resp.Body, true)
go func(ctx context.Context, prc io.ReadCloser, stopCh <-chan struct{}) {
err := rp.cacheMgr.CacheResponse(ctx, prc, stopCh)
if err != nil && err != io.EOF && err != context.Canceled {
klog.Errorf("%s response cache ended with error, %v", util.ReqString(req), err)
}
}(ctx, prc, rp.stopCh)
resp.Body = rc
return nil
}Reference
Full source code and further documentation are available in the OpenYurt GitHub repository: https://github.com/openyurtio/openyurt
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Alibaba Cloud Native
We publish cloud-native tech news, curate in-depth content, host regular events and live streams, and share Alibaba product and user case studies. Join us to explore and share the cloud-native insights you need.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
