Cloud Native 10 min read

How OpenYurt Extends Native Kubernetes to the Edge Without Code Changes

OpenYurt adds edge‑computing capabilities to a vanilla Kubernetes cluster by providing a non‑intrusive, one‑click conversion that enables autonomous edge nodes, with YurtHub and dual‑read‑closer mechanisms handling cached, filtered, and streamed data efficiently.

Alibaba Cloud Native
Alibaba Cloud Native
Alibaba Cloud Native
How OpenYurt Extends Native Kubernetes to the Edge Without Code Changes

OpenYurt Edge Autonomy

OpenYurt extends a native Kubernetes cluster to edge nodes without modifying the core control plane. Edge nodes often operate over unreliable public networks, so they must continue to run workloads when the cloud‑edge link fails. Autonomy is provided by the yurt‑controller‑manager and the YurtHub component.

YurtHub Architecture

YurtHub acts as a transparent reverse‑proxy with a local cache. When the cloud connection is lost or a node restarts, components such as kubelet and kube-proxy obtain required data from YurtHub, preserving edge independence.

YurtHub architecture
YurtHub architecture

Problem: Caching Streaming HTTP Responses

A naïve implementation reads the entire http.Response.Body into memory, writes it to a cache, and then forwards the bytes to the client:

func HandleResponse(rw http.ResponseWriter, resp *http.Response) {
    bodyBytes, _ := ioutil.ReadAll(resp.Body)
    go func() { cacher.Write(bodyBytes) }()
    rw.Write(bodyBytes)
}

This approach fails for two reasons:

Streaming data (e.g., Kubernetes watch requests) cannot be fully captured with a single ReadAll call.

If the data must be filtered before caching, the extra copy doubles memory usage and complicates streaming handling.

Solution 1 – Simultaneous Read/Write with io.TeeReader

io.TeeReader

duplicates a read stream: data read by the client is simultaneously written to a cache writer.

func HandleResponse(rw http.ResponseWriter, resp *http.Response) {
    // Duplicate resp.Body to the cache writer
    tee := io.TeeReader(resp.Body, cacher)
    // Stream data to the client while caching
    io.Copy(rw, tee)
}

Solution 2 – Filtering Before Caching with io.Pipe

When the response must be filtered before being cached, a pipe can split the stream into two readers: one for the filter and one for the client.

func HandleResponse(rw http.ResponseWriter, resp *http.Response) {
    pr, pw := io.Pipe()
    // Duplicate resp.Body to the pipe writer while still sending to client
    tee := io.TeeReader(resp.Body, pw)
    // Run the filter in a separate goroutine
    go func() {
        io.Copy(dataFilter, pr) // dataFilter processes the stream
    }()
    // Stream to the client
    io.Copy(rw, tee)
}

Combining io.TeeReader and io.Pipe enables a single pass over the response body while supporting both caching and optional filtering.

Implementation Detail – dualReadCloser

YurtHub uses a custom dualReadCloser that implements io.ReadCloser. Every Read writes the bytes to an internal pipe, allowing downstream components to consume the same data.

// https://github.com/openyurtio/openyurt/blob/master/pkg/yurthub/util/util.go#L156
func NewDualReadCloser(rc io.ReadCloser, isRespBody bool) (io.ReadCloser, io.ReadCloser) {
    pr, pw := io.Pipe()
    dr := &dualReadCloser{rc: rc, pw: pw, isRespBody: isRespBody}
    return dr, pr
}

type dualReadCloser struct {
    rc         io.ReadCloser
    pw         *io.PipeWriter
    isRespBody bool
}

func (dr *dualReadCloser) Read(p []byte) (n int, err error) {
    n, err = dr.rc.Read(p)
    if n > 0 {
        if _, werr := dr.pw.Write(p[:n]); werr != nil {
            klog.Errorf("dualReader: failed to write %v", werr)
            return n, werr
        }
    }
    return
}

func (dr *dualReadCloser) Close() error {
    var errs []error
    if dr.isRespBody {
        if err := dr.rc.Close(); err != nil { errs = append(errs, err) }
    }
    if err := dr.pw.Close(); err != nil { errs = append(errs, err) }
    if len(errs) != 0 { return fmt.Errorf("failed to close dualReader, %v", errs) }
    return nil
}

Integration in the Reverse Proxy

YurtHub’s reverse‑proxy creates a dualReadCloser in the modifyResponse hook. The response body is replaced with the dual reader, and a goroutine caches the data read from the pipe.

// https://github.com/openyurtio/openyurt/blob/master/pkg/yurthub/proxy/remote/remote.go#L85
func (rp *RemoteProxy) modifyResponse(resp *http.Response) error {
    rc, prc := util.NewDualReadCloser(resp.Body, true)
    go func(ctx context.Context, prc io.ReadCloser, stopCh <-chan struct{}) {
        err := rp.cacheMgr.CacheResponse(ctx, prc, stopCh)
        if err != nil && err != io.EOF && err != context.Canceled {
            klog.Errorf("%s response cache ended with error, %v", util.ReqString(req), err)
        }
    }(ctx, prc, rp.stopCh)
    resp.Body = rc
    return nil
}

Reference

Full source code and further documentation are available in the OpenYurt GitHub repository: https://github.com/openyurtio/openyurt

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

Cloud NativeEdge ComputingKubernetesGoOpenYurtYurtHub
Alibaba Cloud Native
Written by

Alibaba Cloud Native

We publish cloud-native tech news, curate in-depth content, host regular events and live streams, and share Alibaba product and user case studies. Join us to explore and share the cloud-native insights you need.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.