How Nacos Client Publishes Service Change Events: Deep Dive into NotifyCenter

This article dissects Nacos client’s service subscription flow, detailing how the periodic UpdateTask triggers event registration, processes ServiceInfo, and uses NotifyCenter with DefaultPublisher to publish InstancesChangeEvent through a guarded thread and blocking queue mechanism.

Senior Brother's Insights
Senior Brother's Insights
Senior Brother's Insights
How Nacos Client Publishes Service Change Events: Deep Dive into NotifyCenter

Review of the Overall Flow

The Nacos client subscribes to a service via subscribe, which registers an EventListener. A scheduled UpdateTask fetches the latest instance list every six seconds, then calls ServiceInfoHolder#processServiceInfo to handle the data and fire events.

Event Listener Registration

In subscribe the listener is registered through changeNotifier.registerListener:

@Override
public void subscribe(String serviceName, String groupName, List<String> clusters, EventListener listener)
        throws NacosException {
    if (null == listener) {
        return;
    }
    String clusterString = StringUtils.join(clusters, ",");
    changeNotifier.registerListener(groupName, serviceName, clusterString, listener);
    clientProxy.subscribe(serviceName, groupName, clusterString);
}

The actual registration logic resides in InstancesChangeNotifier.registerListener:

// InstancesChangeNotifier
public void registerListener(String groupName, String serviceName, String clusters, EventListener listener) {
    String key = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters);
    ConcurrentHashSet<EventListener> eventListeners = listenerMap.get(key);
    if (eventListeners == null) {
        synchronized (lock) {
            eventListeners = listenerMap.get(key);
            if (eventListeners == null) {
                eventListeners = new ConcurrentHashSet<EventListener>();
                // cache the listener
                listenerMap.put(key, eventListeners);
            }
        }
    }
    eventListeners.add(listener);
}

The listeners are stored in a Map<String, Set<EventListener>> where the key is a concatenation of service information.

Processing ServiceInfo

When UpdateTask obtains a fresh instance list, it retrieves the cached ServiceInfo and, if missing, queries the server:

// fetch cached service info
ServiceInfo serviceObj = serviceInfoHolder.getServiceInfoMap().get(serviceKey);
if (serviceObj == null) {
    // query from server
    serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false);
    serviceInfoHolder.processServiceInfo(serviceObj);
    lastRefTime = serviceObj.getLastRefTime();
    return;
}

The core handling occurs in ServiceInfoHolder.processServiceInfo:

public ServiceInfo processServiceInfo(ServiceInfo serviceInfo) {
    String serviceKey = serviceInfo.getKey();
    if (serviceKey == null) {
        return null;
    }
    ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey());
    if (isEmptyOrErrorPush(serviceInfo)) {
        // empty or error push, just ignore
        return oldService;
    }
    // cache the service info
    serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
    // detect change
    boolean changed = isChangedServiceInfo(oldService, serviceInfo);
    if (StringUtils.isBlank(serviceInfo.getJsonFromServer())) {
        serviceInfo.setJsonFromServer(JacksonUtils.toJson(serviceInfo));
    }
    // monitor size
    MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size());
    if (changed) {
        NAMING_LOGGER.info("current ips:(" + serviceInfo.ipCount() + ") service: " + serviceInfo.getKey() + " -> "
                + JacksonUtils.toJson(serviceInfo.getHosts()));
        // publish change event
        NotifyCenter.publishEvent(new InstancesChangeEvent(serviceInfo.getName(), serviceInfo.getGroupName(),
                serviceInfo.getClusters(), serviceInfo.getHosts()));
        // write to local cache
        DiskCache.write(serviceInfo, cacheDir);
    }
    return serviceInfo;
}

Event Publishing via NotifyCenter

The InstancesChangeEvent is sent through NotifyCenter.publishEvent:

private static boolean publishEvent(final Class<? extends Event> eventType, final Event event) {
    if (ClassUtils.isAssignableFrom(SlowEvent.class, eventType)) {
        return INSTANCE.sharePublisher.publish(event);
    }
    // get canonical name
    final String topic = ClassUtils.getCanonicalName(eventType);
    // fetch publisher
    EventPublisher publisher = INSTANCE.publisherMap.get(topic);
    if (publisher != null) {
        return publisher.publish(event);
    }
    LOGGER.warn("There are no [{}] publishers for this event, please register", topic);
    return false;
}

The binding between event type and publisher is established during NacosNamingService.init via:

NotifyCenter.registerToPublisher(InstancesChangeEvent.class, 16384);
registerToPublisher

uses the default factory, which creates a DefaultPublisher instance.

DefaultPublisher Mechanics

DefaultPublisher

extends Thread and implements EventPublisher:

public class DefaultPublisher extends Thread implements EventPublisher {}

Its init method configures a daemon thread, sets a name, and creates a blocking queue:

@Override
public void init(Class<? extends Event> type, int bufferSize) {
    setDaemon(true);
    setName("nacos.publisher-" + type.getName());
    this.eventType = type;
    this.queueMaxSize = bufferSize;
    this.queue = new ArrayBlockingQueue<>(bufferSize);
    start();
}

The overridden start ensures the thread runs only once and marks it as initialized.

@Override
public synchronized void start() {
    if (!initialized) {
        super.start();
        if (queueMaxSize == -1) {
            queueMaxSize = ringBufferSize;
        }
        initialized = true;
    }
}

When the thread runs, it executes openEventHandler, which contains two loops:

void openEventHandler() {
    try {
        int waitTimes = 60;
        for (;;) {
            if (shutdown || hasSubscriber() || waitTimes <= 0) {
                break;
            }
            ThreadUtils.sleep(1000L);
            waitTimes--;
        }
        for (;;) {
            if (shutdown) {
                break;
            }
            final Event event = queue.take();
            receiveEvent(event);
            UPDATER.compareAndSet(this, lastEventSequence, Math.max(lastEventSequence, event.sequence()));
        }
    } catch (Throwable ex) {
        LOGGER.error("Event listener exception : ", ex);
    }
}

The publish method of DefaultPublisher enqueues the event, falling back to immediate handling if the queue is full:

@Override
public boolean publish(Event event) {
    checkIsStart();
    boolean success = this.queue.offer(event);
    if (!success) {
        LOGGER.warn("Unable to plug in due to interruption, synchronize sending time, event : {}", event);
        receiveEvent(event);
        return true;
    }
    return true;
}

Event consumption happens in receiveEvent:

void receiveEvent(Event event) {
    final long currentEventSequence = event.sequence();
    if (!hasSubscriber()) {
        LOGGER.warn("[NotifyCenter] the {} is lost, because there is no subscriber.", event.getClass());
        return;
    }
    for (Subscriber subscriber : subscribers) {
        if (subscriber.ignoreExpireEvent() && lastEventSequence > currentEventSequence) {
            LOGGER.debug("[NotifyCenter] the {} is unacceptable to this subscriber, because had expire", event.getClass());
            continue;
        }
        notifySubscriber(subscriber, event);
    }
}

Subscribers are registered during NacosNamingService.init via:

// Register subscriber to publisher
NotifyCenter.registerSubscriber(changeNotifier);

The underlying addSubscriber method links the event type, publisher, and subscriber:

private static void addSubscriber(final Subscriber consumer, Class<? extends Event> subscribeType,
        EventPublisherFactory factory) {
    final String topic = ClassUtils.getCanonicalName(subscribeType);
    synchronized (NotifyCenter.class) {
        MapUtil.computeIfAbsent(INSTANCE.publisherMap, topic, factory, subscribeType, ringBufferSize);
    }
    EventPublisher publisher = INSTANCE.publisherMap.get(topic);
    if (publisher instanceof ShardedEventPublisher) {
        ((ShardedEventPublisher) publisher).addSubscriber(consumer, subscribeType);
    } else {
        publisher.addSubscriber(consumer);
    }
}

When notifying, notifySubscriber either uses the subscriber’s custom Executor or runs the callback directly:

@Override
public void notifySubscriber(final Subscriber subscriber, final Event event) {
    LOGGER.debug("[NotifyCenter] the {} will received by {}", event, subscriber);
    final Runnable job = () -> subscriber.onEvent(event);
    final Executor executor = subscriber.executor();
    if (executor != null) {
        executor.execute(job);
    } else {
        try {
            job.run();
        } catch (Throwable e) {
            LOGGER.error("Event callback exception: ", e);
        }
    }
}

Summary

The Nacos client’s service subscription mechanism relies on a periodic task that updates ServiceInfo, detects changes, and publishes an InstancesChangeEvent through NotifyCenter. NotifyCenter maps event types to EventPublisher instances, with the default being a daemon DefaultPublisher that queues events in a blocking queue and processes them in a dedicated thread, ultimately invoking registered listeners.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

BackendJavaMicroservicesservice discoveryNacosEvent PublishingNotifyCenter
Senior Brother's Insights
Written by

Senior Brother's Insights

A public account focused on workplace, career growth, team management, and self-improvement. The author is the writer of books including 'SpringBoot Technology Insider' and 'Drools 8 Rule Engine: Core Technology and Practice'.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.