10、Nacos服务订阅服务端源码分析

news/2024/9/13 3:25:08

本文收录于专栏 Nacos
推荐阅读:Nacos 架构 & 原理

文章目录

  • 前言
  • 一、RequestHandlerRegistry
  • 二、SubscribeServiceRequestHandler
  • 三、EphemeralClientOperationServiceImpl
  • 四、ClientServiceIndexesManager
  • 总结


前言

本篇开始学习Nacos服务订阅相关的源码

一、RequestHandlerRegistry

再前边看客户端注册到Nacos时,我们讲过一部分grpc的逻辑。GrpcRequestAcceptor会接收grpc请求,统一处理,源码如下:

@Autowired
RequestHandlerRegistry requestHandlerRegistry;
...
RequestHandler requestHandler = requestHandlerRegistry.getByRequestType(type);
...
Response response = requestHandler.handleRequest(request, requestMeta);

可以看到RequestHandlerRegistry根据不同的请求类型获取了相应的RequestHandler去处理请求,那么我们看下这个类的源码是如何组装请求类型和请求处理器RequestHandler的对应关系的。

@Service
public class RequestHandlerRegistry implements ApplicationListener<ContextRefreshedEvent> {
    
    Map<String, RequestHandler> registryHandlers = new HashMap<>();
	...

	@Override
    public void onApplicationEvent(ContextRefreshedEvent event) {
        Map<String, RequestHandler> beansOfType = event.getApplicationContext().getBeansOfType(RequestHandler.class);
        ...
        //将处理器保存到本地缓存
        registryHandlers.putIfAbsent(tClass.getSimpleName(), requestHandler);
        }
    }
}

可以看到,RequestHandlerRegistry实现了ApplicationListener,监听了springContextRefreshedEvent事件,在处理事件时将所有RequestHandler的实现保存到本地缓存registryHandlers中。

二、SubscribeServiceRequestHandler

@Override
@Secured(action = ActionTypes.READ)
public SubscribeServiceResponse handle(SubscribeServiceRequest request, RequestMeta meta) throws NacosException {
    String namespaceId = request.getNamespace();
    String serviceName = request.getServiceName();
    String groupName = request.getGroupName();
    String app = request.getHeader("app", "unknown");
    String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
    Service service = Service.newService(namespaceId, groupName, serviceName, true);
    Subscriber subscriber = new Subscriber(meta.getClientIp(), meta.getClientVersion(), app, meta.getClientIp(),
            namespaceId, groupedServiceName, 0, request.getClusters());
    ServiceInfo serviceInfo = ServiceUtil.selectInstancesWithHealthyProtection(serviceStorage.getData(service),
            metadataManager.getServiceMetadata(service).orElse(null), subscriber.getCluster(), false,
            true, subscriber.getIp());
    if (request.isSubscribe()) {
    	//订阅
        clientOperationService.subscribeService(service, subscriber, meta.getConnectionId());
        NotifyCenter.publishEvent(new SubscribeServiceTraceEvent(System.currentTimeMillis(),
                meta.getClientIp(), service.getNamespace(), service.getGroup(), service.getName()));
    } else {
    	//退订
        clientOperationService.unsubscribeService(service, subscriber, meta.getConnectionId());
        NotifyCenter.publishEvent(new UnsubscribeServiceTraceEvent(System.currentTimeMillis(),
                meta.getClientIp(), service.getNamespace(), service.getGroup(), service.getName()));
    }
    return new SubscribeServiceResponse(ResponseCode.SUCCESS.getCode(), "success", serviceInfo);
}

我们从中发现一个熟悉的类EphemeralClientOperationServiceImpl clientOperationService,这个类目前除了客户端注册之外还处理了客户端订阅。

三、EphemeralClientOperationServiceImpl

@Override
public void subscribeService(Service service, Subscriber subscriber, String clientId) {
    Service singleton = ServiceManager.getInstance().getSingletonIfExist(service).orElse(service);
    Client client = clientManager.getClient(clientId);
    if (!clientIsLegal(client, clientId)) {
        return;
    }
    client.addServiceSubscriber(singleton, subscriber);
    client.setLastUpdatedTime();
    NotifyCenter.publishEvent(new ClientOperationEvent.ClientSubscribeServiceEvent(singleton, clientId));
}

这里的处理逻辑如下:

  • 根据clientIdClientManager中获取客户端对象Client
    • 如果可以获取客户端对象,则说明当前客户端已经处理过订阅逻辑,直接返回
  • 发布客户端订阅事件ClientOperationEvent.ClientSubscribeServiceEvent

四、ClientServiceIndexesManager

private void handleClientOperation(ClientOperationEvent event) {
    Service service = event.getService();
    String clientId = event.getClientId();
    if (event instanceof ClientOperationEvent.ClientRegisterServiceEvent) {
        //处理客户端注册事件
        addPublisherIndexes(service, clientId);
    } else if (event instanceof ClientOperationEvent.ClientDeregisterServiceEvent) {
        removePublisherIndexes(service, clientId);
    } else if (event instanceof ClientOperationEvent.ClientSubscribeServiceEvent) {
        //处理客户单订阅事件
        addSubscriberIndexes(service, clientId);
    } else if (event instanceof ClientOperationEvent.ClientUnsubscribeServiceEvent) {
        removeSubscriberIndexes(service, clientId);
    }
}

查找客户单订阅事件ClientOperationEvent.ClientSubscribeServiceEvent的处理类,我们再次看到了熟悉的类ClientServiceIndexesManager。看看addSubscriberIndexes是如何处理客户端订阅事件的:

private final ConcurrentMap<Service, Set<String>> subscriberIndexes = new ConcurrentHashMap<>();

private void addSubscriberIndexes(Service service, String clientId) {
    subscriberIndexes.computeIfAbsent(service, key -> new ConcurrentHashSet<>());
    // Fix #5404, Only first time add need notify event.
    if (subscriberIndexes.get(service).add(clientId)) {
        NotifyCenter.publishEvent(new ServiceEvent.ServiceSubscribedEvent(service, clientId));
    }
}

同客户端注册逻辑一样,对于客户端订阅也是采取了同样的实现方式,那就是使用一个ConcurrentHashMap来存储客户端的订阅信息。

  • key:订阅的service
  • value:订阅指定service的客户端集合

总结

通过以上代码我们可以看出,比起客户端注册,客户端订阅的代码在整体结构设计上更加简单。得益于将客户端逻辑在整体上设计的高内聚,所以除了SubscribeServiceRequestHandler这个RequestHandler的实现之外,我们没有接触新的类。


http://lihuaxi.xjx100.cn/news/1595778.html

相关文章

揭开黑客的神秘面纱:黑客文化、技术手段与防御策略

目录 1. 引言1.1 黑客的定义与起源1.2 黑客文化的形成与传承 2. 黑客的分类与目标2.1 道德黑客与恶意黑客2.2 黑客攻击的目标与动机解析 3. 黑客的技术手段3.1 网络入侵与渗透测试3.2 社会工程学与钓鱼攻击3.3 恶意软件与病毒传播3.4 数据泄露与身份盗窃 4. 防御黑客攻击的策略…

大模型遇上数智化,腾讯云与行业专家共探行业AI发展之路

引言 自去年底 OpenAI 推出 ChatGPT 起&#xff0c;大模型作为新的生产工具登上了新一轮生产力革命的舞台。事实上&#xff0c;数十年来历经了多次起落的 AI 技术&#xff0c;尽管一直被赋予着极高的期待&#xff0c;但在落地产业端时&#xff0c;却总显得差强人意。大模型的爆…

组件的挂载和渲染

React的挂载和渲染 React的生命周期中包括三个主要的阶段&#xff1a;挂载、渲染以及卸载。 很多小伙伴包括我自己可能对挂载和渲染的概念比较模糊&#xff0c;今天这篇文章主要的目的是为了解答我们的这个小疑惑~ 这张图是从其他地方搬运过来的&#xff0c;这张图中描述的主…

ES6 class类关键字super

super关键字 在 JavaSCript 中&#xff0c;能通过 extends 关键字去继承父类 super 关键字在子类中有以下用法&#xff1a; 当成函数调用 super() 作为 "属性查询" super.prop 和 super[expr] super() super 作为函数调用时&#xff0c;代表父类的构造函数。 ES6 要求…

【C++】指针与引用(学习笔记)

一、左值与右值 左值&#xff1a;编译器为其单独分配了一块存储空间&#xff0c;可以取其地址的&#xff0c;可以放在赋值运算符左边 右值&#xff1a;数据本身。不能取到其自身地址&#xff0c;只能赋值运算右边 左值最常见的情况如西数和数据成员的名字 右值是没有标识符、…

百度网盘资源连接记录

https://pan.baidu.com/doc/share/vR49ums3cpPpBcy8wbFYLA-884077696320220

【Godot】时间线(技能)节点

4.1 游戏中一般都会有各种各样的技能&#xff0c;或者其他需要按一定的时间顺序去执行的功能。 这里我写出了一个时间线节点&#xff0c;就像是在播放动画一样&#xff0c;按一定的阶段去执行某些功能 # # Timeline # # - author: zhangxuetu # - datetime: 2023-09-24 23…