21-Dubbo3消费者引用服务入口

    这段代码我们前面详细说了服务引用的配置ReferenceConfig和Dubbo启动器启动应用的过程DubboBootstrap,后面我们直接定位到消费者引用服务的代码位置来看。

    第一个要关注的就是模块发布器DefaultModuleDeployer的start方法,这个start方法包含了Dubbo应用启动的过程

    DefaultModuleDeployer的start方法

    1. ...省略掉若干代码
    2. onModuleStarting();
    3. // initialize
    4. applicationDeployer.initialize();
    5. initialize();
    6. // export services
    7. exportServices();
    8. // prepare application instance
    9. // exclude internal module to avoid wait itself
    10. if (moduleModel != moduleModel.getApplicationModel().getInternalModule()) {
    11. applicationDeployer.prepareInternalModule();
    12. }
    13. // refer services
    14. referServices();
    15. ...省略掉若干代码
    16. return startFuture;
    17. }

    这个方法大部分代码已经省略,也不会详细去说了,感兴趣的可以看之前讲到的博客,这里主要来看引用服务方法referServices

    21.2.2 DefaultModuleDeployer的referServices方法

    下面就要来看消费者应用如何引用的服务的入口了,这个方法主要从大的方面做了一些服务引用生命周期的代码,看懂了这个方法我们就可以不依赖Dubbo负载的启动逻辑可以单独调用ReferenceConfigBase类型的对应方法来刷新,启动,销毁引用的服务了这里先来看下代码再详细介绍内容:

    DefaultModuleDeployer的referServices方法

    1. private void referServices() {
    2. //这个是获取配置的所有的ReferenceConfigBase类型对象
    3. configManager.getReferences().forEach(rc -> {
    4. try {
    5. ReferenceConfig<?> referenceConfig = (ReferenceConfig<?>) rc;
    6. if (!referenceConfig.isRefreshed()) {
    7. //刷新引用配置
    8. referenceConfig.refresh();
    9. }
    10. if (rc.shouldInit()) {
    11. if (referAsync || rc.shouldReferAsync()) {
    12. ExecutorService executor = executorRepository.getServiceReferExecutor();
    13. CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
    14. try {
    15. //间接的通过缓存对象来引用服务配置
    16. referenceCache.get(rc);
    17. } catch (Throwable t) {
    18. logger.error(getIdentifier() + " refer async catch error : " + t.getMessage(), t);
    19. }
    20. }, executor);
    21. asyncReferringFutures.add(future);
    22. } else {
    23. //间接的通过缓存对象来引用服务配置
    24. referenceCache.get(rc);
    25. }
    26. }
    27. } catch (Throwable t) {
    28. logger.error(getIdentifier() + " refer catch error.");
    29. //出现异常销毁引用配置
    30. referenceCache.destroy(rc);
    31. throw t;
    32. }
    33. });
    34. }

    在这个代码中我们核心需要关心的就是SimpleReferenceCache类型的get方法了,在获取服务对象之外包装了一层缓存。

    如果出现了异常则执行referenceCache的destroy方法进行销毁引用配置。

    21.3.1 SimpleReferenceCache是什么?

    一个用于缓存引用ReferenceConfigBase的util工具类。
    ReferenceConfigBase是一个重对象,对于频繁创建ReferenceConfigBase的框架来说,有必要缓存这些对象。
    如果需要使用复杂的策略,可以实现并使用自己的ReferenceConfigBase缓存
    这个Cache是引用服务的开始如果我们想在代码中自定义一些服务引用的逻辑,可以直接创建SimpleReferenceCache类型对象然后调用其get方法进行引用服务。那这个缓存对象是和缓存与引用服务的可以继续往下看。

    1. @Override
    2. @SuppressWarnings("unchecked")
    3. public <T> T get(ReferenceConfigBase<T> rc) {
    4. //这个生成的key规则是这样的 服务分组/服务接口:版本号 详细的代码就不看了
    5. //例如: group/link.elastic.dubbo.entity.DemoService:1.0
    6. String key = generator.generateKey(rc);
    7. //服务类型 如果是泛化调用则这个类型为GenericService
    8. Class<?> type = rc.getInterfaceClass();
    9. //服务是否为单例的这里默认值都为空,为单例模式
    10. boolean singleton = rc.getSingleton() == null || rc.getSingleton();
    11. T proxy = null;
    12. // Check existing proxy of the same 'key' and 'type' first.
    13. if (singleton) {
    14. //一般为单例的 这个方法是从缓存中获取
    15. proxy = get(key, (Class<T>) type);
    16. } else {
    17. //非单例容易造成内存泄露,无法从缓存中获取
    18. logger.warn("Using non-singleton ReferenceConfig and ReferenceCache at the same time may cause memory leak. " +
    19. "Call ReferenceConfig#get() directly for non-singleton ReferenceConfig instead of using ReferenceCache#get(ReferenceConfig)");
    20. }
    21. //前面是从缓存中拿,如果缓存中获取不到则开始引用服务
    22. //获取或者创建值,为引用类型referencesOfType对象(类型为Map<Class<?>, List<ReferenceConfigBase<?>>>)缓存对象生成值(值不存咋时候会生成一个)
    23. List<ReferenceConfigBase<?>> referencesOfType = referenceTypeMap.computeIfAbsent(type, _t -> Collections.synchronizedList(new ArrayList<>()));
    24. //每次走到这里都会添加一个ReferenceConfigBase 引用配置对象(单例的从缓存中拿到就可以直接返回了)
    25. referencesOfType.add(rc);
    26. //与前面一样 前面是类型映射,这里是key映射
    27. referenceConfigList.add(rc);
    28. //开始引用服务
    29. proxy = rc.get();
    30. }
    31. return proxy;
    32. }

    可以看到这个逻辑使用了享元模式(其实就是先查缓存,缓存不存在则创建对象存入缓存)来进行引用对象的管理这样一个过程,这里一共有两个缓存对象referencesOfType和referenceConfigList
    key分别为引用类型和引用的服务的key,值是引用服务的基础配置对象列表List<ReferenceConfigBase<?>>

    后面可以详细看下如果借助ReferenceConfigBase类型对象来进行具体类型的引用。

    21.4.1 初始化引用服务的调用入口

    引用服务的逻辑其实是相对复杂一点的,包含了服务发现,引用对象的创建等等,接下来就让我们详细看下:

    ReferenceConfig类型的get方法

    这里有一段代码是:getScopeModel().getDeployer().start();
    这个前面已经调用了模块发布器启动过了,这里有这么一行代码是有一定作用的,如果使用方直接调用了ReferenceConfigBase的get方法或者缓存对象SimpleReferenceCache类型的对象的get方法来引用服务端的时候就会造成很多配置没有初始化下面执行逻辑的时候出现问题,这个代码其实就是启动模块进行一些基础配置的初始化操作 比如元数据中心默认配置选择,注册中心默认配置选择这些都是比较重要的。

    另外可以看到的是这里使用了双重校验锁来保证单例对象的创建,发现Dubbo种大量的使用了双重校验锁的逻辑。

    21.4.2 初始化引用服务

    这个就直接看代码了这,初始化过程相对复杂一点,我们一点点来看
    ReferenceConfig类型init()方法

    1. protected synchronized void init() {
    2. //初始化标记变量保证只初始化一次,这里又是加锁🔐又是加标记变量的
    3. if (initialized) {
    4. return;
    5. }
    6. initialized = true;
    7. //刷新配置
    8. if (!this.isRefreshed()) {
    9. this.refresh();
    10. }
    11. // init serviceMetadata
    12. //初始化ServiceMetadata类型对象serviceMetadata 为其设置服务基本属性比如版本号,分组,服务接口名
    13. initServiceMetadata(consumer);
    14. //继续初始化元数据信息 服务接口类型和key
    15. serviceMetadata.setServiceType(getServiceInterfaceClass());
    16. // TODO, uncomment this line once service key is unified
    17. serviceMetadata.setServiceKey(URL.buildKey(interfaceName, group, version));
    18. //配置转Map类型
    19. Map<String, String> referenceParameters = appendConfig();
    20. // init service-application mapping
    21. //来自本地存储和url参数的初始化映射。 参数转URL配置初始化 Dubbo中喜欢用url作为配置的一种处理方式
    22. initServiceAppsMapping(referenceParameters);
    23. //本地内存模块服务存储库
    24. ModuleServiceRepository repository = getScopeModel().getServiceRepository();
    25. //ServiceModel和ServiceMetadata在某种程度上是相互重复的。我们将来应该合并它们。
    26. ServiceDescriptor serviceDescriptor;
    27. if (CommonConstants.NATIVE_STUB.equals(getProxy())) {
    28. serviceDescriptor = StubSuppliers.getServiceDescriptor(interfaceName);
    29. repository.registerService(serviceDescriptor);
    30. } else {
    31. //本地存储库注册服务接口类型
    32. serviceDescriptor = repository.registerService(interfaceClass);
    33. }
    34. //消费者模型对象
    35. consumerModel = new ConsumerModel(serviceMetadata.getServiceKey(), proxy, serviceDescriptor, this,
    36. getScopeModel(), serviceMetadata, createAsyncMethodInfo());
    37. //本地存储库注册消费者模型对象
    38. repository.registerConsumer(consumerModel);
    39. //与前面代码一样基础初始化服务元数据对象为其设置附加参数
    40. serviceMetadata.getAttachments().putAll(referenceParameters);
    41. //创建服务的代理对象 !!!核心代码在这里
    42. ref = createProxy(referenceParameters);
    43. //为服务元数据对象设置代理对象
    44. serviceMetadata.setTarget(ref);
    45. serviceMetadata.addAttribute(PROXY_CLASS_REF, ref);
    46. consumerModel.setProxyObject(ref);
    47. consumerModel.initMethodModels();
    48. //检查invoker对象初始结果
    49. checkInvokerAvailable();
    50. }

    这里就要继续看 ReferenceConfig类型的创建代理方法createProxy了
    直接贴一下源码:

    1. private T createProxy(Map<String, String> referenceParameters) {
    2. //本地引用 这里为false
    3. if (shouldJvmRefer(referenceParameters)) {
    4. createInvokerForLocal(referenceParameters);
    5. } else {
    6. urls.clear();
    7. if (StringUtils.isNotEmpty(url)) {
    8. //url存在则为点对点引用
    9. // user specified URL, could be peer-to-peer address, or register center's address.
    10. parseUrl(referenceParameters);
    11. } else {
    12. // if protocols not in jvm checkRegistry
    13. //这里不是local协议默认这里为空
    14. if (!LOCAL_PROTOCOL.equalsIgnoreCase(getProtocol())) {
    15. //从注册表中获取URL并将其聚合。这个其实就是初始化一下注册中心的url配置
    16. aggregateUrlFromRegistry(referenceParameters);
    17. }
    18. }
    19. //这个代码非常重要 创建远程引用,创建远程引用调用器
    20. createInvokerForRemote();
    21. }
    22. if (logger.isInfoEnabled()) {
    23. logger.info("Referred dubbo service: [" + referenceParameters.get(INTERFACE_KEY) + "]." +
    24. }
    25. URL consumerUrl = new ServiceConfigURL(CONSUMER_PROTOCOL, referenceParameters.get(REGISTER_IP_KEY), 0,
    26. referenceParameters.get(INTERFACE_KEY), referenceParameters);
    27. consumerUrl = consumerUrl.setScopeModel(getScopeModel());
    28. consumerUrl = consumerUrl.setServiceModel(consumerModel);
    29. MetadataUtils.publishServiceDefinition(consumerUrl, consumerModel.getServiceModel(), getApplicationModel());
    30. // create service proxy
    31. return (T) proxyFactory.getProxy(invoker, ProtocolUtils.isGeneric(generic));
    32. }

    21.5.2 创建远程引用,创建远程引用调用器

    ReferenceConfig类型的createInvokerForRemote方法

    1. private void createInvokerForRemote() {
    2. //这个url 为注册协议如registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-api-consumer&dubbo=2.0.2&pid=6204&qos.enable=false&qos.port=-1&registry=zookeeper&release=3.0.9&timestamp=1657439419495
    3. if (urls.size() == 1) {
    4. URL curUrl = urls.get(0);
    5. //这个SPI对象是由字节码动态生成的自适应对象Protocol$Adaptie直接看看不到源码,后续可以解析一个字节码生成的类型,这里后续来调用链路即可
    6. invoker = protocolSPI.refer(interfaceClass, curUrl);
    7. if (!UrlUtils.isRegistry(curUrl)) {
    8. List<Invoker<?>> invokers = new ArrayList<>();
    9. invokers.add(invoker);
    10. invoker = Cluster.getCluster(scopeModel, Cluster.DEFAULT).join(new StaticDirectory(curUrl, invokers), true);
    11. }
    12. } else {
    13. List<Invoker<?>> invokers = new ArrayList<>();
    14. URL registryUrl = null;
    15. for (URL url : urls) {
    16. // For multi-registry scenarios, it is not checked whether each referInvoker is available.
    17. // Because this invoker may become available later.
    18. invokers.add(protocolSPI.refer(interfaceClass, url));
    19. if (UrlUtils.isRegistry(url)) {
    20. // use last registry url
    21. registryUrl = url;
    22. }
    23. }
    24. if (registryUrl != null) {
    25. // registry url is available
    26. // for multi-subscription scenario, use 'zone-aware' policy by default
    27. String cluster = registryUrl.getParameter(CLUSTER_KEY, ZoneAwareCluster.NAME);
    28. // The invoker wrap sequence would be: ZoneAwareClusterInvoker(StaticDirectory) -> FailoverClusterInvoker
    29. // (RegistryDirectory, routing happens here) -> Invoker
    30. invoker = Cluster.getCluster(registryUrl.getScopeModel(), cluster, false).join(new StaticDirectory(registryUrl, invokers), false);
    31. } else {
    32. // not a registry url, must be direct invoke.
    33. if (CollectionUtils.isEmpty(invokers)) {
    34. throw new IllegalArgumentException("invokers == null");
    35. }
    36. URL curUrl = invokers.get(0).getUrl();
    37. String cluster = curUrl.getParameter(CLUSTER_KEY, Cluster.DEFAULT);
    38. invoker = Cluster.getCluster(scopeModel, cluster).join(new StaticDirectory(curUrl, invokers), true);
    39. }
    40. }
    41. }

    21.5.3 Invoker对象创建的全过程

    • Protocol$Adaptie的refer方法
    • ProtocolSerializationWrapper AOP类型的协议序列化器refer方法
    • ProtocolFilterWrapper AOP类型的协议过滤器的refer方法
    • QosProtocolWrapper AOP类型的QOS协议包装器的refer方法
    • ProtocolListenerWrapper APO类型监听器包装器的refer方法
    • RegistryProtocol 注册协议的refer方法 (会添加容错逻辑)
    • RegistryProtocol 注册协议的doRefer方法(调用方法创建Invoker对象)

    这里我们不再详细说这个引用链的具体过程直接定位到RegistryProtocol中创建Invoker类型的地方。
    先来看RegistryProtocol类型的refer方法,如下代码所示:

    RegistryProtocol类型的refer方法

    RegistryProtocol类型的doRefer方法创建Invoker对象
    直接来看代码了

    1. protected <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url, Map<String, String> parameters) {
    2. Map<String, Object> consumerAttribute = new HashMap<>(url.getAttributes());
    3. consumerAttribute.remove(REFER_KEY);
    4. String p = isEmpty(parameters.get(PROTOCOL_KEY)) ? CONSUMER : parameters.get(PROTOCOL_KEY);
    5. URL consumerUrl = new ServiceConfigURL (
    6. p,
    7. null,
    8. null,
    9. parameters.get(REGISTER_IP_KEY),
    10. 0, getPath(parameters, type),
    11. parameters,
    12. consumerAttribute
    13. );
    14. url = url.putAttribute(CONSUMER_URL_KEY, consumerUrl);
    15. //重点看这一行 带迁移性质的Invoker对象
    16. ClusterInvoker<T> migrationInvoker = getMigrationInvoker(this, cluster, registry, type, url, consumerUrl);
    17. //这一行回来执行迁移规则创建应用级优先的服务发现Invoker对象
    18. return interceptInvoker(migrationInvoker, url, consumerUrl);
    19. }

    这里代码比较重要的其实只有两行getMigrationInvoker和interceptInvoker方法
    比较核心也是Dubbo3比较重要的消费者启动逻辑基本都在这个方法里面interceptInvoker,这个方法执行了消费者应用级发现和接口级发现迁移的逻辑,会自动帮忙决策一个Invoker类型对象,不过这个逻辑这里先简单看下,后续单独整个文章来说。

    这里我们先来看 ClusterInvoker对象的创建,下面先看代码:

    RegistryProtocol类型的getMigrationInvoker方法

    1. protected <T> ClusterInvoker<T> getMigrationInvoker(RegistryProtocol registryProtocol, Cluster cluster, Registry registry, Class<T> type, URL url, URL consumerUrl) {
    2. return new ServiceDiscoveryMigrationInvoker<T>(registryProtocol, cluster, registry, type, url, consumerUrl);
    3. }

    详细的逻辑这里就不再看了,我们继续看RegistryProtocol类型的interceptInvoker方法:

    具体代码如下:
    RegistryProtocol类型的interceptInvoker方法

    1. protected <T> Invoker<T> interceptInvoker(ClusterInvoker<T> invoker, URL url, URL consumerUrl) {
    2. //获取激活的注册协议监听器扩展里面registry.protocol.listener,这里激活的类型为MigrationRuleListener
    3. List<RegistryProtocolListener> listeners = findRegistryProtocolListeners(url);
    4. if (CollectionUtils.isEmpty(listeners)) {
    5. return invoker;
    6. }
    7. for (RegistryProtocolListener listener : listeners) {
    8. //这里执行MigrationRuleListener类型的onRefer方法
    9. listener.onRefer(this, invoker, consumerUrl, url);
    10. }
    11. }

    该方法尝试加载所有RegistryProtocolListener定义,这些定义通过与定义的交互来控制调用器的行为,然后使用这些侦听器更改MigrationInvoker的状态和行为。
    当前可用的监听器是MigrationRuleListener,用于通过动态变化的规则控制迁移行为。

    Invoker对象的创建完成其实就代表了服务引用执行完成,不过这里核心的协议并没有来说