Cloud Native 15 min read

How Yurthub Implements Data Filtering in OpenYurt: Architecture and Code Walkthrough

Yurthub, a non‑intrusive edge‑computing component of OpenYurt, acts as a kube‑apiserver proxy with caching to enable offline stability and reduce API load, employing four data‑filtering rules—ServiceTopologyFilter, EndpointsFilter, MasterServiceFilter, and DiscardCloudService—each detailed with implementation code and operational considerations.

Alibaba Cloud Native
Alibaba Cloud Native
Alibaba Cloud Native
How Yurthub Implements Data Filtering in OpenYurt: Architecture and Code Walkthrough

Yurthub Data Filtering Framework

Yurthub is a kube‑apiserver proxy with a cache layer, enabling edge nodes to serve requests from local cache when offline and reducing load of large list/watch operations on the cloud API server.

Filtering Rules

1. ServiceTopologyFilter

Targets EndpointSlice resources (Kubernetes v1.18+). It extracts the service name from the kubernetes.io/service-name label, reads the service annotation openyurt.io/topologyKeys, and filters endpoints based on the annotation value: kubernetes.io/hostname – keep endpoints on the same node. openyurt.io/nodepool or openyurt.io/zone – keep endpoints belonging to the same node‑pool or zone.

func (fh *serviceTopologyFilterHandler) reassembleEndpointSlice(endpointSlice *discovery.EndpointSlice) *discovery.EndpointSlice {
    var serviceTopologyType string
    if svcName, ok := endpointSlice.Labels[discovery.LabelServiceName]; ok {
        svc, err := fh.serviceLister.Services(endpointSlice.Namespace).Get(svcName)
        if err != nil {
            klog.Infof("skip reassemble endpointSlice, failed to get service %s/%s, err: %v", endpointSlice.Namespace, svcName, err)
            return endpointSlice
        }
        if serviceTopologyType, ok = svc.Annotations[AnnotationServiceTopologyKey]; !ok {
            klog.Infof("skip reassemble endpointSlice, service %s/%s has no annotation %s", endpointSlice.Namespace, svcName, AnnotationServiceTopologyKey)
            return endpointSlice
        }
    }
    var newEps []discovery.Endpoint
    if serviceTopologyType == AnnotationServiceTopologyValueNode {
        for i := range endpointSlice.Endpoints {
            if endpointSlice.Endpoints[i].Topology[v1.LabelHostname] == fh.nodeName {
                newEps = append(newEps, endpointSlice.Endpoints[i])
            }
        }
        endpointSlice.Endpoints = newEps
    } else if serviceTopologyType == AnnotationServiceTopologyValueNodePool || serviceTopologyType == AnnotationServiceTopologyValueZone {
        currentNode, err := fh.nodeGetter(fh.nodeName)
        if err != nil {
            klog.Infof("skip reassemble endpointSlice, failed to get current node %s, err: %v", fh.nodeName, err)
            return endpointSlice
        }
        if nodePoolName, ok := currentNode.Labels[nodepoolv1alpha1.LabelCurrentNodePool]; ok {
            nodePool, err := fh.nodePoolLister.Get(nodePoolName)
            if err != nil {
                klog.Infof("skip reassemble endpointSlice, failed to get nodepool %s, err: %v", nodePoolName, err)
                return endpointSlice
            }
            for i := range endpointSlice.Endpoints {
                if inSameNodePool(endpointSlice.Endpoints[i].Topology[v1.LabelHostname], nodePool.Status.Nodes) {
                    newEps = append(newEps, endpointSlice.Endpoints[i])
                }
            }
            endpointSlice.Endpoints = newEps
        }
    }
    return endpointSlice
}

2. EndpointsFilter

Operates on Endpoints resources. It verifies the associated Service, obtains the node‑pool from the node label apps.openyurt.io/nodepool, gathers all nodes in that pool, and rebuilds Endpoints.Subsets to contain only addresses from the same pool.

func (fh *endpointsFilterHandler) reassembleEndpoint(endpoints *v1.Endpoints) *v1.Endpoints {
    svcName := endpoints.Name
    _, err := fh.serviceLister.Services(endpoints.Namespace).Get(svcName)
    if err != nil {
        klog.Infof("skip reassemble endpoints, failed to get service %s/%s, err: %v", endpoints.Namespace, svcName, err)
        return endpoints
    }
    currentNode, err := fh.nodeGetter(fh.nodeName)
    if err != nil {
        klog.Infof("skip reassemble endpoints, failed to get current node %s, err: %v", fh.nodeName, err)
        return endpoints
    }
    if nodePoolName, ok := currentNode.Labels[nodepoolv1alpha1.LabelCurrentNodePool]; ok {
        nodePool, err := fh.nodePoolLister.Get(nodePoolName)
        if err != nil {
            klog.Infof("skip reassemble endpoints, failed to get nodepool %s, err: %v", nodePoolName, err)
            return endpoints
        }
        var newEpSubsets []v1.EndpointSubset
        for i := range endpoints.Subsets {
            endpoints.Subsets[i].Addresses = filterValidEndpointsAddr(endpoints.Subsets[i].Addresses, nodePool)
            endpoints.Subsets[i].NotReadyAddresses = filterValidEndpointsAddr(endpoints.Subsets[i].NotReadyAddresses, nodePool)
            if endpoints.Subsets[i].Addresses != nil || endpoints.Subsets[i].NotReadyAddresses != nil {
                newEpSubsets = append(newEpSubsets, endpoints.Subsets[i])
            }
        }
        endpoints.Subsets = newEpSubsets
        if len(endpoints.Subsets) == 0 {
            return nil
        }
    }
    return endpoints
}

3. MasterServiceFilter

Rewrites the ClusterIP and port of the master service so that edge‑side pods can use InClusterConfig to reach cluster resources.

func (fh *masterServiceFilterHandler) ObjectResponseFilter(b []byte) ([]byte, error) {
    list, err := fh.serializer.Decode(b)
    if err != nil || list == nil {
        klog.Errorf("skip filter, failed to decode response in ObjectResponseFilter of masterServiceFilterHandler, %v", err)
        return b, nil
    }
    serviceList, ok := list.(*v1.ServiceList)
    if !ok {
        return b, nil
    }
    for i := range serviceList.Items {
        if serviceList.Items[i].Namespace == MasterServiceNamespace && serviceList.Items[i].Name == MasterServiceName {
            serviceList.Items[i].Spec.ClusterIP = fh.host
            for j := range serviceList.Items[i].Spec.Ports {
                if serviceList.Items[i].Spec.Ports[j].Name == MasterServicePortName {
                    serviceList.Items[i].Spec.Ports[j].Port = fh.port
                    break
                }
            }
            klog.V(2).Infof("mutate master service into ClusterIP:Port=%s:%d for request %s", fh.host, fh.port, util.ReqString(fh.req))
            break
        }
    }
    return fh.serializer.Encode(serviceList)
}

4. DiscardCloudService

Removes services that are not useful for edge nodes:

LoadBalancer services (unless annotated with filter.SkipDiscardServiceAnnotation=true).

ClusterIP services listed in the cloudClusterIPService map (e.g., internal tunnel services).

func (fh *discardCloudServiceFilterHandler) ObjectResponseFilter(b []byte) ([]byte, error) {
    list, err := fh.serializer.Decode(b)
    if err != nil || list == nil {
        klog.Errorf("skip filter, failed to decode response in ObjectResponseFilter of discardCloudServiceFilterHandler %v", err)
        return b, nil
    }
    serviceList, ok := list.(*v1.ServiceList)
    if ok {
        var svcNew []v1.Service
        for i := range serviceList.Items {
            nsName := fmt.Sprintf("%s/%s", serviceList.Items[i].Namespace, serviceList.Items[i].Name)
            if serviceList.Items[i].Spec.Type == v1.ServiceTypeLoadBalancer {
                if serviceList.Items[i].Annotations[filter.SkipDiscardServiceAnnotation] != "true" {
                    klog.V(2).Infof("load balancer service(%s) is discarded in ObjectResponseFilter of discardCloudServiceFilterHandler", nsName)
                    continue
                }
            }
            if _, ok := cloudClusterIPService[nsName]; ok {
                klog.V(2).Infof("clusterIP service(%s) is discarded in ObjectResponseFilter of discardCloudServiceFilterHandler", nsName)
                continue
            }
            svcNew = append(svcNew, serviceList.Items[i])
        }
        serviceList.Items = svcNew
        return fh.serializer.Encode(serviceList)
    }
    return b, nil
}

Current Limitations

The framework hard‑codes filter registration in source code, so adding a new resource filter requires code changes and a restart.

Proposed Dynamic Configuration

Solution 1 – Flags / Environment Variables

Filters are passed via startup flags (e.g., --filter_serviceTopology=...). This approach requires complex flag syntax and cannot be updated without restarting Yurthub.

--filter_serviceTopology=coredns/endpointslices#list,kube-proxy/services#list;watch --filter_endpointsFilter=nginx-ingress-controller/endpoints#list;watch

Solution 2 – ConfigMap‑Based Configuration

Store filter definitions in a ConfigMap. Multiple resources are separated by commas, and the Informer mechanism updates the filters in real time without restarting.

filter_endpoints: coredns/endpoints#list;watch,test/endpoints#list;watch
filter_servicetopology: coredns/endpointslices#list;watch
filter_discardcloudservice: ""
filter_masterservice: ""

Development Issue – Cache Synchronization

When the Informer watches Yurthub’s proxy address, requests may be served before the ConfigMap is synchronized, causing unfiltered data to be returned. A whitelist mechanism is added to bypass filtering for ConfigMap list/watch requests, and cache.WaitForCacheSync ensures the cache is ready before applying filters.

func (a *approver) Approve(comp, resource, verb string) bool {
    if a.isWhitelistReq(comp, resource, verb) {
        return false
    }
    if ok := cache.WaitForCacheSync(a.stopCh, a.configMapSynced); !ok {
        panic("wait for configMap cache sync timeout")
    }
    a.Lock()
    defer a.Unlock()
    for _, requests := range a.nameToRequests {
        for _, request := range requests {
            if request.Equal(comp, resource, verb) {
                return true
            }
        }
    }
    return false
}

Conclusion

With dynamic filter configuration, Yurthub functions as more than a caching reverse proxy; it provides lifecycle management for edge Kubernetes nodes and can be used as a permanent component on any node to improve performance and stability.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

Edge ComputingKubernetesGoData FilteringOpenYurtYurtHub
Alibaba Cloud Native
Written by

Alibaba Cloud Native

We publish cloud-native tech news, curate in-depth content, host regular events and live streams, and share Alibaba product and user case studies. Join us to explore and share the cloud-native insights you need.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.