How Nacos Client Publishes Service Change Events: Deep Dive into NotifyCenter
This article dissects Nacos client’s service subscription flow, detailing how the periodic UpdateTask triggers event registration, processes ServiceInfo, and uses NotifyCenter with DefaultPublisher to publish InstancesChangeEvent through a guarded thread and blocking queue mechanism.
Review of the Overall Flow
The Nacos client subscribes to a service via subscribe, which registers an EventListener. A scheduled UpdateTask fetches the latest instance list every six seconds, then calls ServiceInfoHolder#processServiceInfo to handle the data and fire events.
Event Listener Registration
In subscribe the listener is registered through changeNotifier.registerListener:
@Override
public void subscribe(String serviceName, String groupName, List<String> clusters, EventListener listener)
throws NacosException {
if (null == listener) {
return;
}
String clusterString = StringUtils.join(clusters, ",");
changeNotifier.registerListener(groupName, serviceName, clusterString, listener);
clientProxy.subscribe(serviceName, groupName, clusterString);
}The actual registration logic resides in InstancesChangeNotifier.registerListener:
// InstancesChangeNotifier
public void registerListener(String groupName, String serviceName, String clusters, EventListener listener) {
String key = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters);
ConcurrentHashSet<EventListener> eventListeners = listenerMap.get(key);
if (eventListeners == null) {
synchronized (lock) {
eventListeners = listenerMap.get(key);
if (eventListeners == null) {
eventListeners = new ConcurrentHashSet<EventListener>();
// cache the listener
listenerMap.put(key, eventListeners);
}
}
}
eventListeners.add(listener);
}The listeners are stored in a Map<String, Set<EventListener>> where the key is a concatenation of service information.
Processing ServiceInfo
When UpdateTask obtains a fresh instance list, it retrieves the cached ServiceInfo and, if missing, queries the server:
// fetch cached service info
ServiceInfo serviceObj = serviceInfoHolder.getServiceInfoMap().get(serviceKey);
if (serviceObj == null) {
// query from server
serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false);
serviceInfoHolder.processServiceInfo(serviceObj);
lastRefTime = serviceObj.getLastRefTime();
return;
}The core handling occurs in ServiceInfoHolder.processServiceInfo:
public ServiceInfo processServiceInfo(ServiceInfo serviceInfo) {
String serviceKey = serviceInfo.getKey();
if (serviceKey == null) {
return null;
}
ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey());
if (isEmptyOrErrorPush(serviceInfo)) {
// empty or error push, just ignore
return oldService;
}
// cache the service info
serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
// detect change
boolean changed = isChangedServiceInfo(oldService, serviceInfo);
if (StringUtils.isBlank(serviceInfo.getJsonFromServer())) {
serviceInfo.setJsonFromServer(JacksonUtils.toJson(serviceInfo));
}
// monitor size
MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size());
if (changed) {
NAMING_LOGGER.info("current ips:(" + serviceInfo.ipCount() + ") service: " + serviceInfo.getKey() + " -> "
+ JacksonUtils.toJson(serviceInfo.getHosts()));
// publish change event
NotifyCenter.publishEvent(new InstancesChangeEvent(serviceInfo.getName(), serviceInfo.getGroupName(),
serviceInfo.getClusters(), serviceInfo.getHosts()));
// write to local cache
DiskCache.write(serviceInfo, cacheDir);
}
return serviceInfo;
}Event Publishing via NotifyCenter
The InstancesChangeEvent is sent through NotifyCenter.publishEvent:
private static boolean publishEvent(final Class<? extends Event> eventType, final Event event) {
if (ClassUtils.isAssignableFrom(SlowEvent.class, eventType)) {
return INSTANCE.sharePublisher.publish(event);
}
// get canonical name
final String topic = ClassUtils.getCanonicalName(eventType);
// fetch publisher
EventPublisher publisher = INSTANCE.publisherMap.get(topic);
if (publisher != null) {
return publisher.publish(event);
}
LOGGER.warn("There are no [{}] publishers for this event, please register", topic);
return false;
}The binding between event type and publisher is established during NacosNamingService.init via:
NotifyCenter.registerToPublisher(InstancesChangeEvent.class, 16384); registerToPublisheruses the default factory, which creates a DefaultPublisher instance.
DefaultPublisher Mechanics
DefaultPublisherextends Thread and implements EventPublisher:
public class DefaultPublisher extends Thread implements EventPublisher {}Its init method configures a daemon thread, sets a name, and creates a blocking queue:
@Override
public void init(Class<? extends Event> type, int bufferSize) {
setDaemon(true);
setName("nacos.publisher-" + type.getName());
this.eventType = type;
this.queueMaxSize = bufferSize;
this.queue = new ArrayBlockingQueue<>(bufferSize);
start();
}The overridden start ensures the thread runs only once and marks it as initialized.
@Override
public synchronized void start() {
if (!initialized) {
super.start();
if (queueMaxSize == -1) {
queueMaxSize = ringBufferSize;
}
initialized = true;
}
}When the thread runs, it executes openEventHandler, which contains two loops:
void openEventHandler() {
try {
int waitTimes = 60;
for (;;) {
if (shutdown || hasSubscriber() || waitTimes <= 0) {
break;
}
ThreadUtils.sleep(1000L);
waitTimes--;
}
for (;;) {
if (shutdown) {
break;
}
final Event event = queue.take();
receiveEvent(event);
UPDATER.compareAndSet(this, lastEventSequence, Math.max(lastEventSequence, event.sequence()));
}
} catch (Throwable ex) {
LOGGER.error("Event listener exception : ", ex);
}
}The publish method of DefaultPublisher enqueues the event, falling back to immediate handling if the queue is full:
@Override
public boolean publish(Event event) {
checkIsStart();
boolean success = this.queue.offer(event);
if (!success) {
LOGGER.warn("Unable to plug in due to interruption, synchronize sending time, event : {}", event);
receiveEvent(event);
return true;
}
return true;
}Event consumption happens in receiveEvent:
void receiveEvent(Event event) {
final long currentEventSequence = event.sequence();
if (!hasSubscriber()) {
LOGGER.warn("[NotifyCenter] the {} is lost, because there is no subscriber.", event.getClass());
return;
}
for (Subscriber subscriber : subscribers) {
if (subscriber.ignoreExpireEvent() && lastEventSequence > currentEventSequence) {
LOGGER.debug("[NotifyCenter] the {} is unacceptable to this subscriber, because had expire", event.getClass());
continue;
}
notifySubscriber(subscriber, event);
}
}Subscribers are registered during NacosNamingService.init via:
// Register subscriber to publisher
NotifyCenter.registerSubscriber(changeNotifier);The underlying addSubscriber method links the event type, publisher, and subscriber:
private static void addSubscriber(final Subscriber consumer, Class<? extends Event> subscribeType,
EventPublisherFactory factory) {
final String topic = ClassUtils.getCanonicalName(subscribeType);
synchronized (NotifyCenter.class) {
MapUtil.computeIfAbsent(INSTANCE.publisherMap, topic, factory, subscribeType, ringBufferSize);
}
EventPublisher publisher = INSTANCE.publisherMap.get(topic);
if (publisher instanceof ShardedEventPublisher) {
((ShardedEventPublisher) publisher).addSubscriber(consumer, subscribeType);
} else {
publisher.addSubscriber(consumer);
}
}When notifying, notifySubscriber either uses the subscriber’s custom Executor or runs the callback directly:
@Override
public void notifySubscriber(final Subscriber subscriber, final Event event) {
LOGGER.debug("[NotifyCenter] the {} will received by {}", event, subscriber);
final Runnable job = () -> subscriber.onEvent(event);
final Executor executor = subscriber.executor();
if (executor != null) {
executor.execute(job);
} else {
try {
job.run();
} catch (Throwable e) {
LOGGER.error("Event callback exception: ", e);
}
}
}Summary
The Nacos client’s service subscription mechanism relies on a periodic task that updates ServiceInfo, detects changes, and publishes an InstancesChangeEvent through NotifyCenter. NotifyCenter maps event types to EventPublisher instances, with the default being a daemon DefaultPublisher that queues events in a blocking queue and processes them in a dedicated thread, ultimately invoking registered listeners.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Senior Brother's Insights
A public account focused on workplace, career growth, team management, and self-improvement. The author is the writer of books including 'SpringBoot Technology Insider' and 'Drools 8 Rule Engine: Core Technology and Practice'.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
